Extract cpp from hpp in distributed, fix includes

Summary:
Also removed some convenience code that became obsolete.
No logic changes.

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1303
This commit is contained in:
florijan 2018-03-19 14:42:32 +01:00
parent e77f18bebc
commit eb0e2cb31b
23 changed files with 1092 additions and 883 deletions

View File

@ -16,8 +16,17 @@ set(memgraph_src_files
database/state_delta.cpp database/state_delta.cpp
distributed/coordination_master.cpp distributed/coordination_master.cpp
distributed/coordination_worker.cpp distributed/coordination_worker.cpp
distributed/index_rpc_server.cpp
distributed/plan_consumer.cpp distributed/plan_consumer.cpp
distributed/plan_dispatcher.cpp distributed/plan_dispatcher.cpp
distributed/remote_cache.cpp
distributed/remote_data_manager.cpp
distributed/remote_data_rpc_clients.cpp
distributed/remote_data_rpc_server.cpp
distributed/remote_produce_rpc_server.cpp
distributed/remote_pull_rpc_clients.cpp
distributed/remote_updates_rpc_clients.cpp
distributed/remote_updates_rpc_server.cpp
durability/paths.cpp durability/paths.cpp
durability/recovery.cpp durability/recovery.cpp
durability/snapshooter.cpp durability/snapshooter.cpp

View File

@ -93,7 +93,7 @@ VertexAccessor GraphDbAccessor::InsertVertexIntoRemote(
for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second); for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second);
db().remote_data_manager() db().remote_data_manager()
.Vertices(transaction_id()) .Elements<Vertex>(transaction_id())
.emplace(gid, nullptr, std::move(vertex)); .emplace(gid, nullptr, std::move(vertex));
return VertexAccessor({gid, worker_id}, *this); return VertexAccessor({gid, worker_id}, *this);
} }
@ -417,12 +417,12 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
transaction_id(), from, to, edge_type); transaction_id(), from, to, edge_type);
from_updated = db().remote_data_manager() from_updated = db().remote_data_manager()
.Vertices(transaction_id()) .Elements<Vertex>(transaction_id())
.FindNew(from.gid()); .FindNew(from.gid());
// Create an Edge and insert it into the RemoteCache so we see it locally. // Create an Edge and insert it into the RemoteCache so we see it locally.
db().remote_data_manager() db().remote_data_manager()
.Edges(transaction_id()) .Elements<Edge>(transaction_id())
.emplace( .emplace(
edge_address.gid(), nullptr, edge_address.gid(), nullptr,
std::make_unique<Edge>(from.address(), to.address(), edge_type)); std::make_unique<Edge>(from.address(), to.address(), edge_type));
@ -446,8 +446,9 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
transaction_id(), from, transaction_id(), from,
db().storage().GlobalizedAddress(edge_address), to, edge_type); db().storage().GlobalizedAddress(edge_address), to, edge_type);
} }
to_updated = to_updated = db().remote_data_manager()
db().remote_data_manager().Vertices(transaction_id()).FindNew(to.gid()); .Elements<Vertex>(transaction_id())
.FindNew(to.gid());
} }
to_updated->in_.emplace( to_updated->in_.emplace(
db_.storage().LocalizedAddressIfPossible(from.address()), edge_address, db_.storage().LocalizedAddressIfPossible(from.address()), edge_address,

View File

@ -3,7 +3,6 @@
#include "glog/logging.h" #include "glog/logging.h"
#include "data_structures/concurrent/concurrent_map.hpp" #include "data_structures/concurrent/concurrent_map.hpp"
#include "database/graph_db.hpp"
#include "database/indexes/index_common.hpp" #include "database/indexes/index_common.hpp"
#include "mvcc/version_list.hpp" #include "mvcc/version_list.hpp"
#include "storage/edge.hpp" #include "storage/edge.hpp"

View File

@ -3,7 +3,7 @@
#include <experimental/optional> #include <experimental/optional>
#include "data_structures/concurrent/concurrent_map.hpp" #include "data_structures/concurrent/concurrent_map.hpp"
#include "database/graph_db.hpp" #include "data_structures/concurrent/concurrent_set.hpp"
#include "database/indexes/index_common.hpp" #include "database/indexes/index_common.hpp"
#include "mvcc/version_list.hpp" #include "mvcc/version_list.hpp"
#include "storage/edge.hpp" #include "storage/edge.hpp"
@ -259,7 +259,8 @@ class LabelPropertyIndex {
auto access = GetKeyStorage(key)->access(); auto access = GetKeyStorage(key)->access();
// create the iterator startpoint based on the lower bound // create the iterator startpoint based on the lower bound
auto start_iter = lower ? access.find_or_larger(make_index_bound( auto start_iter = lower
? access.find_or_larger(make_index_bound(
lower, lower.value().IsInclusive())) lower, lower.value().IsInclusive()))
: access.begin(); : access.begin();

View File

@ -0,0 +1,33 @@
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/index_rpc_server.hpp"
namespace distributed {
IndexRpcServer::IndexRpcServer(database::GraphDb &db,
communication::rpc::Server &server)
: db_(db), rpc_server_(server) {
rpc_server_.Register<BuildIndexRpc>([this](const BuildIndexReq &req) {
database::LabelPropertyIndex::Key key{req.member.label,
req.member.property};
database::GraphDbAccessor dba(db_, req.member.tx_id);
if (db_.storage().label_property_index_.CreateIndex(key) == false) {
// If we are a distributed worker we just have to wait till the index
// (which should be in progress of being created) is created so that our
// return guarantess that the index has been built - this assumes that
// no worker thread that is creating an index will fail
while (!dba.LabelPropertyIndexExists(key.label_, key.property_)) {
// TODO reconsider this constant, currently rule-of-thumb chosen
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
} else {
dba.PopulateIndex(key);
dba.EnableIndex(key);
}
return std::make_unique<BuildIndexRes>();
});
}
} // namespace distributed

View File

@ -1,38 +1,18 @@
#pragma once #pragma once
#include "database/graph_db.hpp" namespace communication::rpc {
#include "database/graph_db_accessor.hpp" class Server;
#include "distributed/index_rpc_messages.hpp" }
using namespace database; namespace database {
class GraphDb;
}
namespace distributed { namespace distributed {
class IndexRpcServer { class IndexRpcServer {
public: public:
IndexRpcServer(database::GraphDb &db, communication::rpc::Server &server) IndexRpcServer(database::GraphDb &db, communication::rpc::Server &server);
: db_(db), rpc_server_(server) {
rpc_server_.Register<BuildIndexRpc>([this](const BuildIndexReq &req) {
LabelPropertyIndex::Key key{req.member.label, req.member.property};
GraphDbAccessor dba(db_, req.member.tx_id);
if (db_.storage().label_property_index_.CreateIndex(key) == false) {
// If we are a distributed worker we just have to wait till the index
// (which should be in progress of being created) is created so that our
// return guarantess that the index has been built - this assumes that
// no worker thread that is creating an index will fail
while (!dba.LabelPropertyIndexExists(key.label_, key.property_)) {
// TODO reconsider this constant, currently rule-of-thumb chosen
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
} else {
dba.PopulateIndex(key);
dba.EnableIndex(key);
}
return std::make_unique<BuildIndexRes>();
});
}
private: private:
database::GraphDb &db_; database::GraphDb &db_;

View File

@ -0,0 +1,101 @@
#include "glog/logging.h"
#include "database/storage.hpp"
#include "distributed/remote_cache.hpp"
#include "storage/edge.hpp"
#include "storage/vertex.hpp"
namespace distributed {
template <typename TRecord>
TRecord *RemoteCache<TRecord>::FindNew(gid::Gid gid) {
std::lock_guard<std::mutex> guard{lock_};
auto found = cache_.find(gid);
DCHECK(found != cache_.end())
<< "FindNew for uninitialized remote Vertex/Edge";
auto &pair = found->second;
if (!pair.second) {
pair.second = std::unique_ptr<TRecord>(pair.first->CloneData());
}
return pair.second.get();
}
template <typename TRecord>
void RemoteCache<TRecord>::FindSetOldNew(tx::transaction_id_t tx_id,
int worker_id, gid::Gid gid,
TRecord *&old_record,
TRecord *&new_record) {
{
std::lock_guard<std::mutex> guard(lock_);
auto found = cache_.find(gid);
if (found != cache_.end()) {
old_record = found->second.first.get();
new_record = found->second.second.get();
return;
}
}
auto remote =
remote_data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid);
LocalizeAddresses(*remote);
// This logic is a bit strange because we need to make sure that someone
// else didn't get a response and updated the cache before we did and we
// need a lock for that, but we also need to check if we can now return
// that result - otherwise we could get incosistent results for remote
// FindSetOldNew
std::lock_guard<std::mutex> guard(lock_);
auto it_pair = cache_.emplace(
gid, std::make_pair<rec_uptr, rec_uptr>(std::move(remote), nullptr));
old_record = it_pair.first->second.first.get();
new_record = it_pair.first->second.second.get();
}
template <typename TRecord>
void RemoteCache<TRecord>::emplace(gid::Gid gid, rec_uptr old_record,
rec_uptr new_record) {
if (old_record) LocalizeAddresses(*old_record);
if (new_record) LocalizeAddresses(*new_record);
std::lock_guard<std::mutex> guard{lock_};
// We can't replace existing data because some accessors might be using
// it.
// TODO - consider if it's necessary and OK to copy just the data content.
auto found = cache_.find(gid);
if (found != cache_.end())
return;
else
cache_[gid] = std::make_pair(std::move(old_record), std::move(new_record));
}
template <typename TRecord>
void RemoteCache<TRecord>::ClearCache() {
std::lock_guard<std::mutex> guard{lock_};
cache_.clear();
}
template <>
void RemoteCache<Vertex>::LocalizeAddresses(Vertex &vertex) {
auto localize_edges = [this](auto &edges) {
for (auto &element : edges) {
element.vertex = storage_.LocalizedAddressIfPossible(element.vertex);
element.edge = storage_.LocalizedAddressIfPossible(element.edge);
}
};
localize_edges(vertex.in_.storage());
localize_edges(vertex.out_.storage());
}
template <>
void RemoteCache<Edge>::LocalizeAddresses(Edge &edge) {
edge.from_ = storage_.LocalizedAddressIfPossible(edge.from_);
edge.to_ = storage_.LocalizedAddressIfPossible(edge.to_);
}
template class RemoteCache<Vertex>;
template class RemoteCache<Edge>;
} // namespace distributed

View File

@ -3,14 +3,12 @@
#include <mutex> #include <mutex>
#include <unordered_map> #include <unordered_map>
#include "glog/logging.h"
#include "database/storage.hpp"
#include "distributed/remote_data_rpc_clients.hpp" #include "distributed/remote_data_rpc_clients.hpp"
#include "storage/edge.hpp"
#include "storage/gid.hpp" #include "storage/gid.hpp"
#include "storage/vertex.hpp"
#include "transactions/transaction.hpp" namespace database {
class Storage;
}
namespace distributed { namespace distributed {
@ -34,78 +32,20 @@ class RemoteCache {
/// Returns the new data for the given ID. Creates it (as copy of old) if /// Returns the new data for the given ID. Creates it (as copy of old) if
/// necessary. /// necessary.
TRecord *FindNew(gid::Gid gid) { TRecord *FindNew(gid::Gid gid);
std::lock_guard<std::mutex> guard{lock_};
auto found = cache_.find(gid);
DCHECK(found != cache_.end())
<< "FindNew for uninitialized remote Vertex/Edge";
auto &pair = found->second;
if (!pair.second) {
pair.second = std::unique_ptr<TRecord>(pair.first->CloneData());
}
return pair.second.get();
}
/** /// For the Vertex/Edge with the given global ID, looks for the data visible
* For the Vertex/Edge with the given global ID, looks for the data visible /// from the given transaction's ID and command ID, and caches it. Sets the
* from the given transaction's ID and command ID, and caches it. Sets the /// given pointers to point to the fetched data. Analogue to
* given pointers to point to the fetched data. Analogue to /// mvcc::VersionList::find_set_old_new.
* mvcc::VersionList::find_set_old_new.
*/
void FindSetOldNew(tx::transaction_id_t tx_id, int worker_id, gid::Gid gid, void FindSetOldNew(tx::transaction_id_t tx_id, int worker_id, gid::Gid gid,
TRecord *&old_record, TRecord *&new_record) { TRecord *&old_record, TRecord *&new_record);
{
std::lock_guard<std::mutex> guard(lock_);
auto found = cache_.find(gid);
if (found != cache_.end()) {
old_record = found->second.first.get();
new_record = found->second.second.get();
return;
}
}
auto remote = /// Sets the given records as (new, old) data for the given gid.
remote_data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid); void emplace(gid::Gid gid, rec_uptr old_record, rec_uptr new_record);
LocalizeAddresses(*remote);
// This logic is a bit strange because we need to make sure that someone /// Removes all the data from the cache.
// else didn't get a response and updated the cache before we did and we void ClearCache();
// need a lock for that, but we also need to check if we can now return
// that result - otherwise we could get incosistent results for remote
// FindSetOldNew
std::lock_guard<std::mutex> guard(lock_);
auto it_pair = cache_.emplace(
gid, std::make_pair<rec_uptr, rec_uptr>(std::move(remote), nullptr));
old_record = it_pair.first->second.first.get();
new_record = it_pair.first->second.second.get();
}
/** Sets the given records as (new, old) data for the given gid. */
void emplace(gid::Gid gid, rec_uptr old_record, rec_uptr new_record) {
if (old_record) LocalizeAddresses(*old_record);
if (new_record) LocalizeAddresses(*new_record);
std::lock_guard<std::mutex> guard{lock_};
// We can't replace existing data because some accessors might be using
// it.
// TODO - consider if it's necessary and OK to copy just the data content.
auto found = cache_.find(gid);
if (found != cache_.end())
return;
else
cache_[gid] =
std::make_pair(std::move(old_record), std::move(new_record));
}
/// Removes all the cached data. All the pointers to that data still held by
/// RecordAccessors will become invalid and must never be dereferenced after
/// this call. To make a RecordAccessor valid again Reconstruct must be
/// called on it. This is typically done after the command advanced.
void ClearCache() {
std::lock_guard<std::mutex> guard{lock_};
cache_.clear();
}
private: private:
database::Storage &storage_; database::Storage &storage_;
@ -120,22 +60,4 @@ class RemoteCache {
void LocalizeAddresses(TRecord &record); void LocalizeAddresses(TRecord &record);
}; };
template <>
inline void RemoteCache<Vertex>::LocalizeAddresses(Vertex &vertex) {
auto localize_edges = [this](auto &edges) {
for (auto &element : edges) {
element.vertex = storage_.LocalizedAddressIfPossible(element.vertex);
element.edge = storage_.LocalizedAddressIfPossible(element.edge);
}
};
localize_edges(vertex.in_.storage());
localize_edges(vertex.out_.storage());
}
template <>
inline void RemoteCache<Edge>::LocalizeAddresses(Edge &edge) {
edge.from_ = storage_.LocalizedAddressIfPossible(edge.from_);
edge.to_ = storage_.LocalizedAddressIfPossible(edge.to_);
}
} // namespace distributed } // namespace distributed

View File

@ -0,0 +1,59 @@
#include "distributed/remote_data_manager.hpp"
#include "database/storage.hpp"
namespace distributed {
template <typename TRecord>
RemoteCache<TRecord> &RemoteDataManager::GetCache(CacheT<TRecord> &collection,
tx::transaction_id_t tx_id) {
auto access = collection.access();
auto found = access.find(tx_id);
if (found != access.end()) return found->second;
return access
.emplace(
tx_id, std::make_tuple(tx_id),
std::make_tuple(std::ref(storage_), std::ref(remote_data_clients_)))
.first->second;
}
template <>
RemoteCache<Vertex> &RemoteDataManager::Elements<Vertex>(
tx::transaction_id_t tx_id) {
return GetCache(vertices_caches_, tx_id);
}
template <>
RemoteCache<Edge> &RemoteDataManager::Elements<Edge>(
tx::transaction_id_t tx_id) {
return GetCache(edges_caches_, tx_id);
}
RemoteDataManager::RemoteDataManager(
database::Storage &storage,
distributed::RemoteDataRpcClients &remote_data_clients)
: storage_(storage), remote_data_clients_(remote_data_clients) {}
void RemoteDataManager::ClearCacheForSingleTransaction(
tx::transaction_id_t tx_id) {
Elements<Vertex>(tx_id).ClearCache();
Elements<Edge>(tx_id).ClearCache();
}
void RemoteDataManager::ClearTransactionalCache(
tx::transaction_id_t oldest_active) {
auto vertex_access = vertices_caches_.access();
for (auto &kv : vertex_access) {
if (kv.first < oldest_active) {
vertex_access.remove(kv.first);
}
}
auto edge_access = edges_caches_.access();
for (auto &kv : edge_access) {
if (kv.first < oldest_active) {
edge_access.remove(kv.first);
}
}
}
} // namespace distributed

View File

@ -1,87 +1,49 @@
#pragma once #pragma once
#include "data_structures/concurrent/concurrent_map.hpp" #include "data_structures/concurrent/concurrent_map.hpp"
#include "database/storage.hpp"
#include "distributed/remote_cache.hpp" #include "distributed/remote_cache.hpp"
#include "distributed/remote_data_rpc_clients.hpp" #include "distributed/remote_data_rpc_clients.hpp"
#include "storage/edge.hpp"
#include "storage/vertex.hpp"
#include "transactions/type.hpp" #include "transactions/type.hpp"
class Vertex;
class Edge;
namespace database {
class Storage;
}
namespace distributed { namespace distributed {
/** Handles remote data caches for edges and vertices, per transaction. */ /// Handles remote data caches for edges and vertices, per transaction.
class RemoteDataManager { class RemoteDataManager {
// Helper, gets or inserts a data cache for the given transaction. template <typename TRecord>
template <typename TCollection> using CacheT = ConcurrentMap<tx::transaction_id_t, RemoteCache<TRecord>>;
auto &GetCache(TCollection &collection, tx::transaction_id_t tx_id) {
auto access = collection.access();
auto found = access.find(tx_id);
if (found != access.end()) return found->second;
return access // Helper, gets or inserts a data cache for the given transaction.
.emplace( template <typename TRecord>
tx_id, std::make_tuple(tx_id), RemoteCache<TRecord> &GetCache(CacheT<TRecord> &collection,
std::make_tuple(std::ref(storage_), std::ref(remote_data_clients_))) tx::transaction_id_t tx_id);
.first->second;
}
public: public:
RemoteDataManager(database::Storage &storage, RemoteDataManager(database::Storage &storage,
distributed::RemoteDataRpcClients &remote_data_clients) distributed::RemoteDataRpcClients &remote_data_clients);
: storage_(storage), remote_data_clients_(remote_data_clients) {}
/// Gets or creates the remote vertex cache for the given transaction.
auto &Vertices(tx::transaction_id_t tx_id) {
return GetCache(vertices_caches_, tx_id);
}
/// Gets or creates the remote edge cache for the given transaction.
auto &Edges(tx::transaction_id_t tx_id) {
return GetCache(edges_caches_, tx_id);
}
/// Gets or creates the remote vertex/edge cache for the given transaction. /// Gets or creates the remote vertex/edge cache for the given transaction.
template <typename TRecord> template <typename TRecord>
auto &Elements(tx::transaction_id_t tx_id); RemoteCache<TRecord> &Elements(tx::transaction_id_t tx_id);
/// Removes all the caches for a single transaction. /// Removes all the caches for a single transaction.
void ClearCacheForSingleTransaction(tx::transaction_id_t tx_id) { void ClearCacheForSingleTransaction(tx::transaction_id_t tx_id);
Vertices(tx_id).ClearCache();
Edges(tx_id).ClearCache();
}
/// Clears the cache of local transactions that have expired. The signature of /// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::CacheCleaner`. /// this method is dictated by `distributed::CacheCleaner`.
void ClearTransactionalCache(tx::transaction_id_t oldest_active) { void ClearTransactionalCache(tx::transaction_id_t oldest_active);
auto vertex_access = vertices_caches_.access();
for (auto &kv : vertex_access) {
if (kv.first < oldest_active) {
vertex_access.remove(kv.first);
}
}
auto edge_access = edges_caches_.access();
for (auto &kv : edge_access) {
if (kv.first < oldest_active) {
edge_access.remove(kv.first);
}
}
}
private: private:
database::Storage &storage_; database::Storage &storage_;
RemoteDataRpcClients &remote_data_clients_; RemoteDataRpcClients &remote_data_clients_;
ConcurrentMap<tx::transaction_id_t, RemoteCache<Vertex>> vertices_caches_; CacheT<Vertex> vertices_caches_;
ConcurrentMap<tx::transaction_id_t, RemoteCache<Edge>> edges_caches_; CacheT<Edge> edges_caches_;
}; };
template <>
inline auto &RemoteDataManager::Elements<Vertex>(tx::transaction_id_t tx_id) {
return Vertices(tx_id);
}
template <>
inline auto &RemoteDataManager::Elements<Edge>(tx::transaction_id_t tx_id) {
return Edges(tx_id);
}
} // namespace distributed } // namespace distributed

View File

@ -0,0 +1,26 @@
#include "distributed/remote_data_rpc_clients.hpp"
#include "distributed/remote_data_rpc_messages.hpp"
#include "storage/edge.hpp"
#include "storage/vertex.hpp"
namespace distributed {
template <>
std::unique_ptr<Edge> RemoteDataRpcClients::RemoteElement(
int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) {
auto response = clients_.GetClientPool(worker_id).Call<RemoteEdgeRpc>(
TxGidPair{tx_id, gid});
CHECK(response) << "RemoteEdgeRpc failed";
return std::move(response->name_output_);
}
template <>
std::unique_ptr<Vertex> RemoteDataRpcClients::RemoteElement(
int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) {
auto response = clients_.GetClientPool(worker_id).Call<RemoteVertexRpc>(
TxGidPair{tx_id, gid});
CHECK(response) << "RemoteVertexRpc failed";
return std::move(response->name_output_);
}
} // namespace distributed

View File

@ -3,42 +3,19 @@
#include <mutex> #include <mutex>
#include <utility> #include <utility>
#include "distributed/coordination.hpp"
#include "distributed/remote_data_rpc_messages.hpp"
#include "distributed/rpc_worker_clients.hpp" #include "distributed/rpc_worker_clients.hpp"
#include "storage/gid.hpp" #include "storage/gid.hpp"
#include "transactions/type.hpp" #include "transactions/type.hpp"
namespace distributed { namespace distributed {
/** Provides access to other worker's data. */ /// Provides access to other worker's data.
class RemoteDataRpcClients { class RemoteDataRpcClients {
public: public:
RemoteDataRpcClients(RpcWorkerClients &clients) : clients_(clients) {} RemoteDataRpcClients(RpcWorkerClients &clients) : clients_(clients) {}
/// Returns a remote worker's record (vertex/edge) data for the given params.
/// Returns a remote worker's data for the given params. That worker must own /// That worker must own the vertex/edge for the given id, and that vertex
/// the vertex for the given id, and that vertex must be visible in given /// must be visible in given transaction.
/// transaction.
std::unique_ptr<Vertex> RemoteVertex(int worker_id,
tx::transaction_id_t tx_id,
gid::Gid gid) {
auto response = clients_.GetClientPool(worker_id).Call<RemoteVertexRpc>(
TxGidPair{tx_id, gid});
CHECK(response) << "RemoteVertexRpc failed";
return std::move(response->name_output_);
}
/// Returns a remote worker's data for the given params. That worker must own
/// the edge for the given id, and that edge must be visible in given
/// transaction.
std::unique_ptr<Edge> RemoteEdge(int worker_id, tx::transaction_id_t tx_id,
gid::Gid gid) {
auto response = clients_.GetClientPool(worker_id).Call<RemoteEdgeRpc>(
TxGidPair{tx_id, gid});
CHECK(response) << "RemoteEdgeRpc failed";
return std::move(response->name_output_);
}
template <typename TRecord> template <typename TRecord>
std::unique_ptr<TRecord> RemoteElement(int worker_id, std::unique_ptr<TRecord> RemoteElement(int worker_id,
tx::transaction_id_t tx_id, tx::transaction_id_t tx_id,
@ -48,16 +25,4 @@ class RemoteDataRpcClients {
RpcWorkerClients &clients_; RpcWorkerClients &clients_;
}; };
template <>
inline std::unique_ptr<Edge> RemoteDataRpcClients::RemoteElement(
int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) {
return RemoteEdge(worker_id, tx_id, gid);
}
template <>
inline std::unique_ptr<Vertex> RemoteDataRpcClients::RemoteElement(
int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) {
return RemoteVertex(worker_id, tx_id, gid);
}
} // namespace distributed } // namespace distributed

View File

@ -0,0 +1,28 @@
#include <memory>
#include "database/graph_db_accessor.hpp"
#include "distributed/remote_data_rpc_messages.hpp"
#include "remote_data_rpc_server.hpp"
namespace distributed {
RemoteDataRpcServer::RemoteDataRpcServer(database::GraphDb &db,
communication::rpc::Server &server)
: db_(db), rpc_server_(server) {
rpc_server_.Register<RemoteVertexRpc>([this](const RemoteVertexReq &req) {
database::GraphDbAccessor dba(db_, req.member.tx_id);
auto vertex = dba.FindVertexChecked(req.member.gid, false);
CHECK(vertex.GetOld())
<< "Old record must exist when sending vertex by RPC";
return std::make_unique<RemoteVertexRes>(vertex.GetOld(), db_.WorkerId());
});
rpc_server_.Register<RemoteEdgeRpc>([this](const RemoteEdgeReq &req) {
database::GraphDbAccessor dba(db_, req.member.tx_id);
auto edge = dba.FindEdgeChecked(req.member.gid, false);
CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC";
return std::make_unique<RemoteEdgeRes>(edge.GetOld(), db_.WorkerId());
});
}
} // namespace distributed

View File

@ -1,35 +1,15 @@
#pragma once #pragma once
#include <memory>
#include "communication/rpc/server.hpp" #include "communication/rpc/server.hpp"
#include "database/graph_db.hpp" #include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/remote_data_rpc_messages.hpp"
#include "transactions/type.hpp"
namespace distributed { namespace distributed {
/** Serves this worker's data to others. */ /// Serves this worker's data to others.
class RemoteDataRpcServer { class RemoteDataRpcServer {
public: public:
RemoteDataRpcServer(database::GraphDb &db, communication::rpc::Server &server) RemoteDataRpcServer(database::GraphDb &db,
: db_(db), rpc_server_(server) { communication::rpc::Server &server);
rpc_server_.Register<RemoteVertexRpc>([this](const RemoteVertexReq &req) {
database::GraphDbAccessor dba(db_, req.member.tx_id);
auto vertex = dba.FindVertexChecked(req.member.gid, false);
CHECK(vertex.GetOld())
<< "Old record must exist when sending vertex by RPC";
return std::make_unique<RemoteVertexRes>(vertex.GetOld(), db_.WorkerId());
});
rpc_server_.Register<RemoteEdgeRpc>([this](const RemoteEdgeReq &req) {
database::GraphDbAccessor dba(db_, req.member.tx_id);
auto edge = dba.FindEdgeChecked(req.member.gid, false);
CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC";
return std::make_unique<RemoteEdgeRes>(edge.GetOld(), db_.WorkerId());
});
}
private: private:
database::GraphDb &db_; database::GraphDb &db_;

View File

@ -0,0 +1,170 @@
#include "distributed/remote_produce_rpc_server.hpp"
#include "distributed/remote_data_manager.hpp"
#include "distributed/remote_pull_produce_rpc_messages.hpp"
#include "query/common.hpp"
#include "query/exceptions.hpp"
#include "transactions/engine_worker.hpp"
namespace distributed {
RemoteProduceRpcServer::OngoingProduce::OngoingProduce(
database::GraphDb &db, tx::transaction_id_t tx_id,
std::shared_ptr<query::plan::LogicalOperator> op,
query::SymbolTable symbol_table, Parameters parameters,
std::vector<query::Symbol> pull_symbols)
: dba_{db, tx_id},
cursor_(op->MakeCursor(dba_)),
context_(dba_),
pull_symbols_(std::move(pull_symbols)),
frame_(symbol_table.max_position()) {
context_.symbol_table_ = std::move(symbol_table);
context_.parameters_ = std::move(parameters);
}
std::pair<std::vector<query::TypedValue>, RemotePullState>
RemoteProduceRpcServer::OngoingProduce::Pull() {
if (!accumulation_.empty()) {
auto results = std::move(accumulation_.back());
accumulation_.pop_back();
for (auto &element : results) {
try {
query::ReconstructTypedValue(element);
} catch (query::ReconstructionException &) {
cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR;
return std::make_pair(std::move(results), cursor_state_);
}
}
return std::make_pair(std::move(results),
RemotePullState::CURSOR_IN_PROGRESS);
}
return PullOneFromCursor();
}
RemotePullState RemoteProduceRpcServer::OngoingProduce::Accumulate() {
while (true) {
auto result = PullOneFromCursor();
if (result.second != RemotePullState::CURSOR_IN_PROGRESS)
return result.second;
else
accumulation_.emplace_back(std::move(result.first));
}
}
std::pair<std::vector<query::TypedValue>, RemotePullState>
RemoteProduceRpcServer::OngoingProduce::PullOneFromCursor() {
std::vector<query::TypedValue> results;
// Check if we already exhausted this cursor (or it entered an error
// state). This happens when we accumulate before normal pull.
if (cursor_state_ != RemotePullState::CURSOR_IN_PROGRESS) {
return std::make_pair(results, cursor_state_);
}
try {
if (cursor_->Pull(frame_, context_)) {
results.reserve(pull_symbols_.size());
for (const auto &symbol : pull_symbols_) {
results.emplace_back(std::move(frame_[symbol]));
}
} else {
cursor_state_ = RemotePullState::CURSOR_EXHAUSTED;
}
} catch (const mvcc::SerializationError &) {
cursor_state_ = RemotePullState::SERIALIZATION_ERROR;
} catch (const LockTimeoutException &) {
cursor_state_ = RemotePullState::LOCK_TIMEOUT_ERROR;
} catch (const RecordDeletedError &) {
cursor_state_ = RemotePullState::UPDATE_DELETED_ERROR;
} catch (const query::ReconstructionException &) {
cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR;
} catch (const query::RemoveAttachedVertexException &) {
cursor_state_ = RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR;
} catch (const query::QueryRuntimeException &) {
cursor_state_ = RemotePullState::QUERY_ERROR;
} catch (const query::HintedAbortError &) {
cursor_state_ = RemotePullState::HINTED_ABORT_ERROR;
}
return std::make_pair(std::move(results), cursor_state_);
}
RemoteProduceRpcServer::RemoteProduceRpcServer(
database::GraphDb &db, tx::Engine &tx_engine,
communication::rpc::Server &server,
const distributed::PlanConsumer &plan_consumer)
: db_(db),
remote_produce_rpc_server_(server),
plan_consumer_(plan_consumer),
tx_engine_(tx_engine) {
remote_produce_rpc_server_.Register<RemotePullRpc>(
[this](const RemotePullReq &req) {
return std::make_unique<RemotePullRes>(RemotePull(req));
});
remote_produce_rpc_server_.Register<TransactionCommandAdvancedRpc>(
[this](const TransactionCommandAdvancedReq &req) {
tx_engine_.UpdateCommand(req.member);
db_.remote_data_manager().ClearCacheForSingleTransaction(req.member);
return std::make_unique<TransactionCommandAdvancedRes>();
});
}
void RemoteProduceRpcServer::ClearTransactionalCache(
tx::transaction_id_t oldest_active) {
auto access = ongoing_produces_.access();
for (auto &kv : access) {
if (kv.first.first < oldest_active) {
access.remove(kv.first);
}
}
}
RemoteProduceRpcServer::OngoingProduce &
RemoteProduceRpcServer::GetOngoingProduce(const RemotePullReq &req) {
auto access = ongoing_produces_.access();
auto key_pair = std::make_pair(req.tx_id, req.plan_id);
auto found = access.find(key_pair);
if (found != access.end()) {
return found->second;
}
if (db_.type() == database::GraphDb::Type::DISTRIBUTED_WORKER) {
// On the worker cache the snapshot to have one RPC less.
dynamic_cast<tx::WorkerEngine &>(tx_engine_)
.RunningTransaction(req.tx_id, req.tx_snapshot);
}
auto &plan_pack = plan_consumer_.PlanForId(req.plan_id);
return access
.emplace(key_pair, std::forward_as_tuple(key_pair),
std::forward_as_tuple(db_, req.tx_id, plan_pack.plan,
plan_pack.symbol_table, req.params,
req.symbols))
.first->second;
}
RemotePullResData RemoteProduceRpcServer::RemotePull(const RemotePullReq &req) {
auto &ongoing_produce = GetOngoingProduce(req);
RemotePullResData result{db_.WorkerId(), req.send_old, req.send_new};
result.state_and_frames.pull_state = RemotePullState::CURSOR_IN_PROGRESS;
if (req.accumulate) {
result.state_and_frames.pull_state = ongoing_produce.Accumulate();
// If an error ocurred, we need to return that error.
if (result.state_and_frames.pull_state !=
RemotePullState::CURSOR_EXHAUSTED) {
return result;
}
}
for (int i = 0; i < req.batch_size; ++i) {
auto pull_result = ongoing_produce.Pull();
result.state_and_frames.pull_state = pull_result.second;
if (pull_result.second != RemotePullState::CURSOR_IN_PROGRESS) break;
result.state_and_frames.frames.emplace_back(std::move(pull_result.first));
}
return result;
}
} // namespace distributed

View File

@ -9,28 +9,21 @@
#include "database/graph_db.hpp" #include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "distributed/plan_consumer.hpp" #include "distributed/plan_consumer.hpp"
#include "distributed/remote_data_manager.hpp"
#include "distributed/remote_pull_produce_rpc_messages.hpp"
#include "query/common.hpp"
#include "query/context.hpp" #include "query/context.hpp"
#include "query/exceptions.hpp"
#include "query/frontend/semantic/symbol_table.hpp" #include "query/frontend/semantic/symbol_table.hpp"
#include "query/interpret/frame.hpp" #include "query/interpret/frame.hpp"
#include "query/parameters.hpp" #include "query/parameters.hpp"
#include "query/plan/operator.hpp" #include "query/plan/operator.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
#include "transactions/engine.hpp" #include "transactions/engine.hpp"
#include "transactions/engine_worker.hpp"
#include "transactions/type.hpp" #include "transactions/type.hpp"
namespace distributed { namespace distributed {
/** /// Handles the execution of a plan on the worker, requested by the remote
* Handles the execution of a plan on the worker, requested by the remote /// master. Assumes that (tx_id, plan_id) uniquely identifies an execution, and
* master. Assumes that (tx_id, plan_id) uniquely identifies an execution, and /// that there will never be parallel requests for the same execution thus
* that there will never be parallel requests for the same execution thus /// identified.
* identified.
*/
class RemoteProduceRpcServer { class RemoteProduceRpcServer {
/// Encapsulates a Cursor execution in progress. Can be used for pulling a /// Encapsulates a Cursor execution in progress. Can be used for pulling a
/// single result from the execution, or pulling all and accumulating the /// single result from the execution, or pulling all and accumulating the
@ -41,50 +34,16 @@ class RemoteProduceRpcServer {
OngoingProduce(database::GraphDb &db, tx::transaction_id_t tx_id, OngoingProduce(database::GraphDb &db, tx::transaction_id_t tx_id,
std::shared_ptr<query::plan::LogicalOperator> op, std::shared_ptr<query::plan::LogicalOperator> op,
query::SymbolTable symbol_table, Parameters parameters, query::SymbolTable symbol_table, Parameters parameters,
std::vector<query::Symbol> pull_symbols) std::vector<query::Symbol> pull_symbols);
: dba_{db, tx_id},
cursor_(op->MakeCursor(dba_)),
context_(dba_),
pull_symbols_(std::move(pull_symbols)),
frame_(symbol_table.max_position()) {
context_.symbol_table_ = std::move(symbol_table);
context_.parameters_ = std::move(parameters);
}
/// Returns a vector of typed values (one for each `pull_symbol`), and an /// Returns a vector of typed values (one for each `pull_symbol`), and an
/// indication of the pull result. The result data is valid only if the /// indication of the pull result. The result data is valid only if the
/// returned state is CURSOR_IN_PROGRESS. /// returned state is CURSOR_IN_PROGRESS.
std::pair<std::vector<query::TypedValue>, RemotePullState> Pull() { std::pair<std::vector<query::TypedValue>, RemotePullState> Pull();
if (!accumulation_.empty()) {
auto results = std::move(accumulation_.back());
accumulation_.pop_back();
for (auto &element : results) {
try {
query::ReconstructTypedValue(element);
} catch (query::ReconstructionException &) {
cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR;
return std::make_pair(std::move(results), cursor_state_);
}
}
return std::make_pair(std::move(results),
RemotePullState::CURSOR_IN_PROGRESS);
}
return PullOneFromCursor();
}
/// Accumulates all the frames pulled from the cursor and returns /// Accumulates all the frames pulled from the cursor and returns
/// CURSOR_EXHAUSTED. If an error occurs, an appropriate value is returned. /// CURSOR_EXHAUSTED. If an error occurs, an appropriate value is returned.
RemotePullState Accumulate() { RemotePullState Accumulate();
while (true) {
auto result = PullOneFromCursor();
if (result.second != RemotePullState::CURSOR_IN_PROGRESS)
return result.second;
else
accumulation_.emplace_back(std::move(result.first));
}
}
private: private:
database::GraphDbAccessor dba_; database::GraphDbAccessor dba_;
@ -95,75 +54,19 @@ class RemoteProduceRpcServer {
RemotePullState cursor_state_{RemotePullState::CURSOR_IN_PROGRESS}; RemotePullState cursor_state_{RemotePullState::CURSOR_IN_PROGRESS};
std::vector<std::vector<query::TypedValue>> accumulation_; std::vector<std::vector<query::TypedValue>> accumulation_;
/// Pulls and returns a single result from the cursor.
std::pair<std::vector<query::TypedValue>, RemotePullState> std::pair<std::vector<query::TypedValue>, RemotePullState>
PullOneFromCursor() { PullOneFromCursor();
std::vector<query::TypedValue> results;
// Check if we already exhausted this cursor (or it entered an error
// state). This happens when we accumulate before normal pull.
if (cursor_state_ != RemotePullState::CURSOR_IN_PROGRESS) {
return std::make_pair(results, cursor_state_);
}
try {
if (cursor_->Pull(frame_, context_)) {
results.reserve(pull_symbols_.size());
for (const auto &symbol : pull_symbols_) {
results.emplace_back(std::move(frame_[symbol]));
}
} else {
cursor_state_ = RemotePullState::CURSOR_EXHAUSTED;
}
} catch (const mvcc::SerializationError &) {
cursor_state_ = RemotePullState::SERIALIZATION_ERROR;
} catch (const LockTimeoutException &) {
cursor_state_ = RemotePullState::LOCK_TIMEOUT_ERROR;
} catch (const RecordDeletedError &) {
cursor_state_ = RemotePullState::UPDATE_DELETED_ERROR;
} catch (const query::ReconstructionException &) {
cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR;
} catch (const query::RemoveAttachedVertexException &) {
cursor_state_ = RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR;
} catch (const query::QueryRuntimeException &) {
cursor_state_ = RemotePullState::QUERY_ERROR;
} catch (const query::HintedAbortError &) {
cursor_state_ = RemotePullState::HINTED_ABORT_ERROR;
}
return std::make_pair(std::move(results), cursor_state_);
}
}; };
public: public:
RemoteProduceRpcServer(database::GraphDb &db, tx::Engine &tx_engine, RemoteProduceRpcServer(database::GraphDb &db, tx::Engine &tx_engine,
communication::rpc::Server &server, communication::rpc::Server &server,
const distributed::PlanConsumer &plan_consumer) const distributed::PlanConsumer &plan_consumer);
: db_(db),
remote_produce_rpc_server_(server),
plan_consumer_(plan_consumer),
tx_engine_(tx_engine) {
remote_produce_rpc_server_.Register<RemotePullRpc>(
[this](const RemotePullReq &req) {
return std::make_unique<RemotePullRes>(RemotePull(req));
});
remote_produce_rpc_server_.Register<TransactionCommandAdvancedRpc>(
[this](const TransactionCommandAdvancedReq &req) {
tx_engine_.UpdateCommand(req.member);
db_.remote_data_manager().ClearCacheForSingleTransaction(req.member);
return std::make_unique<TransactionCommandAdvancedRes>();
});
}
/// Clears the cache of local transactions that have expired. The signature of /// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::TransactionalCacheCleaner`. /// this method is dictated by `distributed::TransactionalCacheCleaner`.
void ClearTransactionalCache(tx::transaction_id_t oldest_active) { void ClearTransactionalCache(tx::transaction_id_t oldest_active);
auto access = ongoing_produces_.access();
for (auto &kv : access) {
if (kv.first.first < oldest_active) {
access.remove(kv.first);
}
}
}
private: private:
database::GraphDb &db_; database::GraphDb &db_;
@ -173,50 +76,12 @@ class RemoteProduceRpcServer {
ongoing_produces_; ongoing_produces_;
tx::Engine &tx_engine_; tx::Engine &tx_engine_;
auto &GetOngoingProduce(const RemotePullReq &req) { /// Gets an ongoing produce for the given pull request. Creates a new one if
auto access = ongoing_produces_.access(); /// there is none currently existing.
auto key_pair = std::make_pair(req.tx_id, req.plan_id); OngoingProduce &GetOngoingProduce(const RemotePullReq &req);
auto found = access.find(key_pair);
if (found != access.end()) {
return found->second;
}
if (db_.type() == database::GraphDb::Type::DISTRIBUTED_WORKER) {
// On the worker cache the snapshot to have one RPC less.
dynamic_cast<tx::WorkerEngine &>(tx_engine_)
.RunningTransaction(req.tx_id, req.tx_snapshot);
}
auto &plan_pack = plan_consumer_.PlanForId(req.plan_id);
return access
.emplace(key_pair, std::forward_as_tuple(key_pair),
std::forward_as_tuple(db_, req.tx_id, plan_pack.plan,
plan_pack.symbol_table, req.params,
req.symbols))
.first->second;
}
RemotePullResData RemotePull(const RemotePullReq &req) { /// Performs a single remote pull for the given request.
auto &ongoing_produce = GetOngoingProduce(req); RemotePullResData RemotePull(const RemotePullReq &req);
RemotePullResData result{db_.WorkerId(), req.send_old, req.send_new};
result.state_and_frames.pull_state = RemotePullState::CURSOR_IN_PROGRESS;
if (req.accumulate) {
result.state_and_frames.pull_state = ongoing_produce.Accumulate();
// If an error ocurred, we need to return that error.
if (result.state_and_frames.pull_state !=
RemotePullState::CURSOR_EXHAUSTED) {
return result;
}
}
for (int i = 0; i < req.batch_size; ++i) {
auto pull_result = ongoing_produce.Pull();
result.state_and_frames.pull_state = pull_result.second;
if (pull_result.second != RemotePullState::CURSOR_IN_PROGRESS) break;
result.state_and_frames.frames.emplace_back(std::move(pull_result.first));
}
return result;
}
}; };
} // namespace distributed } // namespace distributed

View File

@ -0,0 +1,72 @@
#include <functional>
#include "distributed/remote_data_manager.hpp"
#include "distributed/remote_pull_rpc_clients.hpp"
#include "storage/edge.hpp"
#include "storage/vertex.hpp"
namespace distributed {
utils::Future<RemotePullData> RemotePullRpcClients::RemotePull(
database::GraphDbAccessor &dba, int worker_id, int64_t plan_id,
const Parameters &params, const std::vector<query::Symbol> &symbols,
bool accumulate, int batch_size) {
return clients_.ExecuteOnWorker<RemotePullData>(
worker_id, [&dba, plan_id, params, symbols, accumulate,
batch_size](ClientPool &client_pool) {
auto result = client_pool.Call<RemotePullRpc>(
dba.transaction_id(), dba.transaction().snapshot(), plan_id, params,
symbols, accumulate, batch_size, true, true);
auto handle_vertex = [&dba](auto &v) {
dba.db()
.remote_data_manager()
.Elements<Vertex>(dba.transaction_id())
.emplace(v.global_address.gid(), std::move(v.old_record),
std::move(v.new_record));
if (v.element_in_frame) {
VertexAccessor va(v.global_address, dba);
*v.element_in_frame = va;
}
};
auto handle_edge = [&dba](auto &e) {
dba.db()
.remote_data_manager()
.Elements<Edge>(dba.transaction_id())
.emplace(e.global_address.gid(), std::move(e.old_record),
std::move(e.new_record));
if (e.element_in_frame) {
EdgeAccessor ea(e.global_address, dba);
*e.element_in_frame = ea;
}
};
for (auto &v : result->data.vertices) handle_vertex(v);
for (auto &e : result->data.edges) handle_edge(e);
for (auto &p : result->data.paths) {
handle_vertex(p.vertices[0]);
p.path_in_frame =
query::Path(VertexAccessor(p.vertices[0].global_address, dba));
query::Path &path_in_frame = p.path_in_frame.ValuePath();
for (size_t i = 0; i < p.edges.size(); ++i) {
handle_edge(p.edges[i]);
path_in_frame.Expand(EdgeAccessor(p.edges[i].global_address, dba));
handle_vertex(p.vertices[i + 1]);
path_in_frame.Expand(
VertexAccessor(p.vertices[i + 1].global_address, dba));
}
}
return std::move(result->data.state_and_frames);
});
}
std::vector<utils::Future<void>>
RemotePullRpcClients::NotifyAllTransactionCommandAdvanced(
tx::transaction_id_t tx_id) {
return clients_.ExecuteOnWorkers<void>(0, [tx_id](auto &client) {
auto res = client.template Call<TransactionCommandAdvancedRpc>(tx_id);
CHECK(res) << "TransactionCommandAdvanceRpc failed";
});
}
} // namespace distributed

View File

@ -1,10 +1,8 @@
#pragma once #pragma once
#include <functional>
#include <vector> #include <vector>
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "distributed/remote_data_manager.hpp"
#include "distributed/remote_pull_produce_rpc_messages.hpp" #include "distributed/remote_pull_produce_rpc_messages.hpp"
#include "distributed/rpc_worker_clients.hpp" #include "distributed/rpc_worker_clients.hpp"
#include "query/frontend/semantic/symbol.hpp" #include "query/frontend/semantic/symbol.hpp"
@ -14,10 +12,10 @@
namespace distributed { namespace distributed {
/** Provides means of calling for the execution of a plan on some remote worker, /// Provides means of calling for the execution of a plan on some remote worker,
* and getting the results of that execution. The results are returned in /// and getting the results of that execution. The results are returned in
* batches and are therefore accompanied with an enum indicator of the state of /// batches and are therefore accompanied with an enum indicator of the state of
* remote execution. */ /// remote execution.
class RemotePullRpcClients { class RemotePullRpcClients {
using ClientPool = communication::rpc::ClientPool; using ClientPool = communication::rpc::ClientPool;
@ -34,68 +32,15 @@ class RemotePullRpcClients {
utils::Future<RemotePullData> RemotePull( utils::Future<RemotePullData> RemotePull(
database::GraphDbAccessor &dba, int worker_id, int64_t plan_id, database::GraphDbAccessor &dba, int worker_id, int64_t plan_id,
const Parameters &params, const std::vector<query::Symbol> &symbols, const Parameters &params, const std::vector<query::Symbol> &symbols,
bool accumulate, int batch_size = kDefaultBatchSize) { bool accumulate, int batch_size = kDefaultBatchSize);
return clients_.ExecuteOnWorker<RemotePullData>(
worker_id, [&dba, plan_id, params, symbols, accumulate,
batch_size](ClientPool &client_pool) {
auto result = client_pool.Call<RemotePullRpc>(
dba.transaction_id(), dba.transaction().snapshot(), plan_id,
params, symbols, accumulate, batch_size, true, true);
auto handle_vertex = [&dba](auto &v) {
dba.db()
.remote_data_manager()
.Vertices(dba.transaction_id())
.emplace(v.global_address.gid(), std::move(v.old_record),
std::move(v.new_record));
if (v.element_in_frame) {
VertexAccessor va(v.global_address, dba);
*v.element_in_frame = va;
}
};
auto handle_edge = [&dba](auto &e) {
dba.db()
.remote_data_manager()
.Edges(dba.transaction_id())
.emplace(e.global_address.gid(), std::move(e.old_record),
std::move(e.new_record));
if (e.element_in_frame) {
EdgeAccessor ea(e.global_address, dba);
*e.element_in_frame = ea;
}
};
for (auto &v : result->data.vertices) handle_vertex(v);
for (auto &e : result->data.edges) handle_edge(e);
for (auto &p : result->data.paths) {
handle_vertex(p.vertices[0]);
p.path_in_frame =
query::Path(VertexAccessor(p.vertices[0].global_address, dba));
query::Path &path_in_frame = p.path_in_frame.ValuePath();
for (size_t i = 0; i < p.edges.size(); ++i) {
handle_edge(p.edges[i]);
path_in_frame.Expand(
EdgeAccessor(p.edges[i].global_address, dba));
handle_vertex(p.vertices[i + 1]);
path_in_frame.Expand(
VertexAccessor(p.vertices[i + 1].global_address, dba));
}
}
return std::move(result->data.state_and_frames);
});
}
auto GetWorkerIds() { return clients_.GetWorkerIds(); } auto GetWorkerIds() { return clients_.GetWorkerIds(); }
std::vector<utils::Future<void>> NotifyAllTransactionCommandAdvanced( std::vector<utils::Future<void>> NotifyAllTransactionCommandAdvanced(
tx::transaction_id_t tx_id) { tx::transaction_id_t tx_id);
return clients_.ExecuteOnWorkers<void>(0, [tx_id](auto &client) {
auto res = client.template Call<TransactionCommandAdvancedRpc>(tx_id);
CHECK(res) << "TransactionCommandAdvanceRpc failed";
});
}
private: private:
RpcWorkerClients &clients_; RpcWorkerClients &clients_;
}; };
} // namespace distributed } // namespace distributed

View File

@ -0,0 +1,125 @@
#include <unordered_map>
#include <vector>
#include "distributed/remote_updates_rpc_clients.hpp"
#include "query/exceptions.hpp"
namespace distributed {
namespace {
void RaiseIfRemoteError(RemoteUpdateResult result) {
switch (result) {
case RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
throw query::RemoveAttachedVertexException();
case RemoteUpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError();
case RemoteUpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException(
"Remote LockTimeoutError during edge creation");
case RemoteUpdateResult::UPDATE_DELETED_ERROR:
throw RecordDeletedError();
case RemoteUpdateResult::DONE:
break;
}
}
}
RemoteUpdateResult RemoteUpdatesRpcClients::RemoteUpdate(
int worker_id, const database::StateDelta &delta) {
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteUpdateRpc>(delta);
CHECK(res) << "RemoteUpdateRpc failed on worker: " << worker_id;
return res->member;
}
gid::Gid RemoteUpdatesRpcClients::RemoteCreateVertex(
int worker_id, tx::transaction_id_t tx_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties) {
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteCreateVertexRpc>(
RemoteCreateVertexReqData{tx_id, labels, properties});
CHECK(res) << "RemoteCreateVertexRpc failed on worker: " << worker_id;
CHECK(res->member.result == RemoteUpdateResult::DONE)
<< "Remote Vertex creation result not RemoteUpdateResult::DONE";
return res->member.gid;
}
storage::EdgeAddress RemoteUpdatesRpcClients::RemoteCreateEdge(
tx::transaction_id_t tx_id, VertexAccessor &from, VertexAccessor &to,
storage::EdgeType edge_type) {
CHECK(from.address().is_remote())
<< "In RemoteCreateEdge `from` must be remote";
int from_worker = from.address().worker_id();
auto res = worker_clients_.GetClientPool(from_worker)
.Call<RemoteCreateEdgeRpc>(RemoteCreateEdgeReqData{
from.gid(), to.GlobalAddress(), edge_type, tx_id});
CHECK(res) << "RemoteCreateEdge RPC failed on worker: " << from_worker;
RaiseIfRemoteError(res->member.result);
return {res->member.gid, from_worker};
}
void RemoteUpdatesRpcClients::RemoteAddInEdge(tx::transaction_id_t tx_id,
VertexAccessor &from,
storage::EdgeAddress edge_address,
VertexAccessor &to,
storage::EdgeType edge_type) {
CHECK(to.address().is_remote() && edge_address.is_remote() &&
(from.GlobalAddress().worker_id() != to.address().worker_id()))
<< "RemoteAddInEdge should only be called when `to` is remote and "
"`from` is not on the same worker as `to`.";
auto worker_id = to.GlobalAddress().worker_id();
auto res = worker_clients_.GetClientPool(worker_id).Call<RemoteAddInEdgeRpc>(
RemoteAddInEdgeReqData{from.GlobalAddress(), edge_address, to.gid(),
edge_type, tx_id});
CHECK(res) << "RemoteAddInEdge RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
void RemoteUpdatesRpcClients::RemoteRemoveVertex(int worker_id,
tx::transaction_id_t tx_id,
gid::Gid gid,
bool check_empty) {
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteRemoveVertexRpc>(
RemoteRemoveVertexReqData{gid, tx_id, check_empty});
CHECK(res) << "RemoteRemoveVertex RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
void RemoteUpdatesRpcClients::RemoteRemoveEdge(
tx::transaction_id_t tx_id, int worker_id, gid::Gid edge_gid,
gid::Gid vertex_from_id, storage::VertexAddress vertex_to_addr) {
auto res = worker_clients_.GetClientPool(worker_id).Call<RemoteRemoveEdgeRpc>(
RemoteRemoveEdgeData{tx_id, edge_gid, vertex_from_id, vertex_to_addr});
CHECK(res) << "RemoteRemoveEdge RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
void RemoteUpdatesRpcClients::RemoteRemoveInEdge(
tx::transaction_id_t tx_id, int worker_id, gid::Gid vertex_id,
storage::EdgeAddress edge_address) {
CHECK(edge_address.is_remote())
<< "RemoteRemoveInEdge edge_address is local.";
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteRemoveInEdgeRpc>(
RemoteRemoveInEdgeData{tx_id, vertex_id, edge_address});
CHECK(res) << "RemoteRemoveInEdge RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
std::vector<utils::Future<RemoteUpdateResult>>
RemoteUpdatesRpcClients::RemoteUpdateApplyAll(int skip_worker_id,
tx::transaction_id_t tx_id) {
return worker_clients_.ExecuteOnWorkers<RemoteUpdateResult>(
skip_worker_id, [tx_id](auto &client) {
auto res = client.template Call<RemoteUpdateApplyRpc>(tx_id);
CHECK(res) << "RemoteUpdateApplyRpc failed";
return res->member;
});
}
} // namespace distributed

View File

@ -6,7 +6,6 @@
#include "database/state_delta.hpp" #include "database/state_delta.hpp"
#include "distributed/remote_updates_rpc_messages.hpp" #include "distributed/remote_updates_rpc_messages.hpp"
#include "distributed/rpc_worker_clients.hpp" #include "distributed/rpc_worker_clients.hpp"
#include "query/exceptions.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
#include "storage/address_types.hpp" #include "storage/address_types.hpp"
#include "storage/gid.hpp" #include "storage/gid.hpp"
@ -26,27 +25,14 @@ class RemoteUpdatesRpcClients {
/// Sends an update delta to the given worker. /// Sends an update delta to the given worker.
RemoteUpdateResult RemoteUpdate(int worker_id, RemoteUpdateResult RemoteUpdate(int worker_id,
const database::StateDelta &delta) { const database::StateDelta &delta);
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteUpdateRpc>(delta);
CHECK(res) << "RemoteUpdateRpc failed on worker: " << worker_id;
return res->member;
}
/// Creates a vertex on the given worker and returns it's id. /// Creates a vertex on the given worker and returns it's id.
gid::Gid RemoteCreateVertex( gid::Gid RemoteCreateVertex(
int worker_id, tx::transaction_id_t tx_id, int worker_id, tx::transaction_id_t tx_id,
const std::vector<storage::Label> &labels, const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue> const std::unordered_map<storage::Property, query::TypedValue>
&properties) { &properties);
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteCreateVertexRpc>(
RemoteCreateVertexReqData{tx_id, labels, properties});
CHECK(res) << "RemoteCreateVertexRpc failed on worker: " << worker_id;
CHECK(res->member.result == RemoteUpdateResult::DONE)
<< "Remote Vertex creation result not RemoteUpdateResult::DONE";
return res->member.gid;
}
/// Creates an edge on the given worker and returns it's address. If the `to` /// Creates an edge on the given worker and returns it's address. If the `to`
/// vertex is on the same worker as `from`, then all remote CRUD will be /// vertex is on the same worker as `from`, then all remote CRUD will be
@ -56,45 +42,17 @@ class RemoteUpdatesRpcClients {
storage::EdgeAddress RemoteCreateEdge(tx::transaction_id_t tx_id, storage::EdgeAddress RemoteCreateEdge(tx::transaction_id_t tx_id,
VertexAccessor &from, VertexAccessor &from,
VertexAccessor &to, VertexAccessor &to,
storage::EdgeType edge_type) { storage::EdgeType edge_type);
CHECK(from.address().is_remote())
<< "In RemoteCreateEdge `from` must be remote";
int from_worker = from.address().worker_id();
auto res = worker_clients_.GetClientPool(from_worker)
.Call<RemoteCreateEdgeRpc>(RemoteCreateEdgeReqData{
from.gid(), to.GlobalAddress(), edge_type, tx_id});
CHECK(res) << "RemoteCreateEdge RPC failed on worker: " << from_worker;
RaiseIfRemoteError(res->member.result);
return {res->member.gid, from_worker};
}
/// Adds the edge with the given address to the `to` vertex as an incoming /// Adds the edge with the given address to the `to` vertex as an incoming
/// edge. Only used when `to` is remote and not on the same worker as `from`. /// edge. Only used when `to` is remote and not on the same worker as `from`.
void RemoteAddInEdge(tx::transaction_id_t tx_id, VertexAccessor &from, void RemoteAddInEdge(tx::transaction_id_t tx_id, VertexAccessor &from,
storage::EdgeAddress edge_address, VertexAccessor &to, storage::EdgeAddress edge_address, VertexAccessor &to,
storage::EdgeType edge_type) { storage::EdgeType edge_type);
CHECK(to.address().is_remote() && edge_address.is_remote() &&
(from.GlobalAddress().worker_id() != to.address().worker_id()))
<< "RemoteAddInEdge should only be called when `to` is remote and "
"`from` is not on the same worker as `to`.";
auto worker_id = to.GlobalAddress().worker_id();
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteAddInEdgeRpc>(
RemoteAddInEdgeReqData{from.GlobalAddress(), edge_address, to.gid(),
edge_type, tx_id});
CHECK(res) << "RemoteAddInEdge RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
/// Removes a vertex from the other worker.
void RemoteRemoveVertex(int worker_id, tx::transaction_id_t tx_id, void RemoteRemoveVertex(int worker_id, tx::transaction_id_t tx_id,
gid::Gid gid, bool check_empty) { gid::Gid gid, bool check_empty);
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteRemoveVertexRpc>(
RemoteRemoveVertexReqData{gid, tx_id, check_empty});
CHECK(res) << "RemoteRemoveVertex RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
/// Removes an edge on another worker. This also handles the `from` vertex /// Removes an edge on another worker. This also handles the `from` vertex
/// outgoing edge, as that vertex is on the same worker as the edge. If the /// outgoing edge, as that vertex is on the same worker as the edge. If the
@ -103,56 +61,19 @@ class RemoteUpdatesRpcClients {
/// RemoteRemoveInEdge. /// RemoteRemoveInEdge.
void RemoteRemoveEdge(tx::transaction_id_t tx_id, int worker_id, void RemoteRemoveEdge(tx::transaction_id_t tx_id, int worker_id,
gid::Gid edge_gid, gid::Gid vertex_from_id, gid::Gid edge_gid, gid::Gid vertex_from_id,
storage::VertexAddress vertex_to_addr) { storage::VertexAddress vertex_to_addr);
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteRemoveEdgeRpc>(
RemoteRemoveEdgeData{tx_id, edge_gid, vertex_from_id,
vertex_to_addr});
CHECK(res) << "RemoteRemoveEdge RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
void RemoteRemoveInEdge(tx::transaction_id_t tx_id, int worker_id, void RemoteRemoveInEdge(tx::transaction_id_t tx_id, int worker_id,
gid::Gid vertex_id, gid::Gid vertex_id,
storage::EdgeAddress edge_address) { storage::EdgeAddress edge_address);
CHECK(edge_address.is_remote())
<< "RemoteRemoveInEdge edge_address is local.";
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteRemoveInEdgeRpc>(
RemoteRemoveInEdgeData{tx_id, vertex_id, edge_address});
CHECK(res) << "RemoteRemoveInEdge RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
/// Calls for all the workers (except the given one) to apply their updates /// Calls for all the workers (except the given one) to apply their updates
/// and returns the future results. /// and returns the future results.
std::vector<utils::Future<RemoteUpdateResult>> RemoteUpdateApplyAll( std::vector<utils::Future<RemoteUpdateResult>> RemoteUpdateApplyAll(
int skip_worker_id, tx::transaction_id_t tx_id) { int skip_worker_id, tx::transaction_id_t tx_id);
return worker_clients_.ExecuteOnWorkers<RemoteUpdateResult>(
skip_worker_id, [tx_id](auto &client) {
auto res = client.template Call<RemoteUpdateApplyRpc>(tx_id);
CHECK(res) << "RemoteUpdateApplyRpc failed";
return res->member;
});
}
private: private:
RpcWorkerClients &worker_clients_; RpcWorkerClients &worker_clients_;
void RaiseIfRemoteError(RemoteUpdateResult result) {
switch (result) {
case RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
throw query::RemoveAttachedVertexException();
case RemoteUpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError();
case RemoteUpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException(
"Remote LockTimeoutError during edge creation");
case RemoteUpdateResult::UPDATE_DELETED_ERROR:
throw RecordDeletedError();
case RemoteUpdateResult::DONE:
break;
}
}
}; };
} // namespace distributed } // namespace distributed

View File

@ -0,0 +1,362 @@
#include <utility>
#include "glog/logging.h"
#include "distributed/remote_updates_rpc_server.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
namespace distributed {
template <typename TRecordAccessor>
RemoteUpdateResult
RemoteUpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Emplace(
const database::StateDelta &delta) {
auto gid = std::is_same<TRecordAccessor, VertexAccessor>::value
? delta.vertex_id
: delta.edge_id;
std::lock_guard<SpinLock> guard{lock_};
auto found = deltas_.find(gid);
if (found == deltas_.end()) {
found =
deltas_
.emplace(gid, std::make_pair(FindAccessor(gid),
std::vector<database::StateDelta>{}))
.first;
}
found->second.second.emplace_back(delta);
// TODO call `RecordAccessor::update` to force serialization errors to
// fail-fast (as opposed to when all the deltas get applied).
//
// This is problematic because `VersionList::update` needs to become
// thread-safe within the same transaction. Note that the concurrency is
// possible both between the owner worker interpretation thread and an RPC
// thread (current thread), as well as multiple RPC threads if this
// object's lock is released (perhaps desirable).
//
// A potential solution *might* be that `LockStore::Lock` returns a `bool`
// indicating if the caller was the one obtaining the lock (not the same
// as lock already being held by the same transaction).
//
// Another thing that needs to be done (if we do this) is ensuring that
// `LockStore::Take` is thread-safe when called in parallel in the same
// transaction. Currently it's thread-safe only when called in parallel
// from different transactions (only one manages to take the RecordLock).
//
// Deferring the implementation of this as it's tricky, and essentially an
// optimization.
//
// try {
// found->second.first.update();
// } catch (const mvcc::SerializationError &) {
// return RemoteUpdateResult::SERIALIZATION_ERROR;
// } catch (const RecordDeletedError &) {
// return RemoteUpdateResult::UPDATE_DELETED_ERROR;
// } catch (const LockTimeoutException &) {
// return RemoteUpdateResult::LOCK_TIMEOUT_ERROR;
// }
return RemoteUpdateResult::DONE;
}
template <typename TRecordAccessor>
gid::Gid
RemoteUpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateVertex(
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties) {
auto result = db_accessor_.InsertVertex();
for (auto &label : labels) result.add_label(label);
for (auto &kv : properties) result.PropsSet(kv.first, kv.second);
std::lock_guard<SpinLock> guard{lock_};
deltas_.emplace(result.gid(),
std::make_pair(result, std::vector<database::StateDelta>{}));
return result.gid();
}
template <typename TRecordAccessor>
gid::Gid
RemoteUpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateEdge(
gid::Gid from, storage::VertexAddress to, storage::EdgeType edge_type) {
auto &db = db_accessor_.db();
auto edge = db_accessor_.InsertOnlyEdge(
{from, db.WorkerId()}, db.storage().LocalizedAddressIfPossible(to),
edge_type);
std::lock_guard<SpinLock> guard{lock_};
deltas_.emplace(edge.gid(),
std::make_pair(edge, std::vector<database::StateDelta>{}));
return edge.gid();
}
template <typename TRecordAccessor>
RemoteUpdateResult
RemoteUpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Apply() {
std::lock_guard<SpinLock> guard{lock_};
for (auto &kv : deltas_) {
auto &record_accessor = kv.second.first;
// We need to reconstruct the record as in the meantime some local
// update might have updated it.
record_accessor.Reconstruct();
for (database::StateDelta &delta : kv.second.second) {
try {
auto &dba = db_accessor_;
switch (delta.type) {
case database::StateDelta::Type::TRANSACTION_BEGIN:
case database::StateDelta::Type::TRANSACTION_COMMIT:
case database::StateDelta::Type::TRANSACTION_ABORT:
case database::StateDelta::Type::CREATE_VERTEX:
case database::StateDelta::Type::CREATE_EDGE:
case database::StateDelta::Type::BUILD_INDEX:
LOG(FATAL) << "Can only apply record update deltas for remote "
"graph element";
case database::StateDelta::Type::REMOVE_VERTEX:
if (!db_accessor().RemoveVertex(
reinterpret_cast<VertexAccessor &>(record_accessor),
delta.check_empty)) {
return RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR;
}
break;
case database::StateDelta::Type::SET_PROPERTY_VERTEX:
case database::StateDelta::Type::SET_PROPERTY_EDGE:
record_accessor.PropsSet(delta.property, delta.value);
break;
case database::StateDelta::Type::ADD_LABEL:
reinterpret_cast<VertexAccessor &>(record_accessor)
.add_label(delta.label);
break;
case database::StateDelta::Type::REMOVE_LABEL:
reinterpret_cast<VertexAccessor &>(record_accessor)
.remove_label(delta.label);
break;
case database::StateDelta::Type::ADD_OUT_EDGE:
reinterpret_cast<Vertex &>(record_accessor.update())
.out_.emplace(dba.db().storage().LocalizedAddressIfPossible(
delta.vertex_to_address),
dba.db().storage().LocalizedAddressIfPossible(
delta.edge_address),
delta.edge_type);
dba.wal().Emplace(delta);
break;
case database::StateDelta::Type::ADD_IN_EDGE:
reinterpret_cast<Vertex &>(record_accessor.update())
.in_.emplace(dba.db().storage().LocalizedAddressIfPossible(
delta.vertex_from_address),
dba.db().storage().LocalizedAddressIfPossible(
delta.edge_address),
delta.edge_type);
dba.wal().Emplace(delta);
break;
case database::StateDelta::Type::REMOVE_EDGE:
// We only remove the edge as a result of this StateDelta,
// because the removal of edge from vertex in/out is performed
// in REMOVE_[IN/OUT]_EDGE deltas.
db_accessor_.RemoveEdge(
reinterpret_cast<EdgeAccessor &>(record_accessor), false,
false);
break;
case database::StateDelta::Type::REMOVE_OUT_EDGE:
reinterpret_cast<VertexAccessor &>(record_accessor)
.RemoveOutEdge(delta.edge_address);
break;
case database::StateDelta::Type::REMOVE_IN_EDGE:
reinterpret_cast<VertexAccessor &>(record_accessor)
.RemoveInEdge(delta.edge_address);
break;
}
} catch (const mvcc::SerializationError &) {
return RemoteUpdateResult::SERIALIZATION_ERROR;
} catch (const RecordDeletedError &) {
return RemoteUpdateResult::UPDATE_DELETED_ERROR;
} catch (const LockTimeoutException &) {
return RemoteUpdateResult::LOCK_TIMEOUT_ERROR;
}
}
}
return RemoteUpdateResult::DONE;
}
RemoteUpdatesRpcServer::RemoteUpdatesRpcServer(
database::GraphDb &db, communication::rpc::Server &server)
: db_(db) {
server.Register<RemoteUpdateRpc>([this](const RemoteUpdateReq &req) {
using DeltaType = database::StateDelta::Type;
auto &delta = req.member;
switch (delta.type) {
case DeltaType::SET_PROPERTY_VERTEX:
case DeltaType::ADD_LABEL:
case DeltaType::REMOVE_LABEL:
case database::StateDelta::Type::REMOVE_OUT_EDGE:
case database::StateDelta::Type::REMOVE_IN_EDGE:
return std::make_unique<RemoteUpdateRes>(
GetUpdates(vertex_updates_, delta.transaction_id).Emplace(delta));
case DeltaType::SET_PROPERTY_EDGE:
return std::make_unique<RemoteUpdateRes>(
GetUpdates(edge_updates_, delta.transaction_id).Emplace(delta));
default:
LOG(FATAL) << "Can't perform a remote update with delta type: "
<< static_cast<int>(req.member.type);
}
});
server.Register<RemoteUpdateApplyRpc>(
[this](const RemoteUpdateApplyReq &req) {
return std::make_unique<RemoteUpdateApplyRes>(Apply(req.member));
});
server.Register<RemoteCreateVertexRpc>([this](
const RemoteCreateVertexReq &req) {
gid::Gid gid = GetUpdates(vertex_updates_, req.member.tx_id)
.CreateVertex(req.member.labels, req.member.properties);
return std::make_unique<RemoteCreateVertexRes>(
RemoteCreateResult{RemoteUpdateResult::DONE, gid});
});
server.Register<RemoteCreateEdgeRpc>([this](const RemoteCreateEdgeReq &req) {
auto data = req.member;
auto creation_result = CreateEdge(data);
// If `from` and `to` are both on this worker, we handle it in this
// RPC call. Do it only if CreateEdge succeeded.
if (creation_result.result == RemoteUpdateResult::DONE &&
data.to.worker_id() == db_.WorkerId()) {
auto to_delta = database::StateDelta::AddInEdge(
data.tx_id, data.to.gid(), {data.from, db_.WorkerId()},
{creation_result.gid, db_.WorkerId()}, data.edge_type);
creation_result.result =
GetUpdates(vertex_updates_, data.tx_id).Emplace(to_delta);
}
return std::make_unique<RemoteCreateEdgeRes>(creation_result);
});
server.Register<RemoteAddInEdgeRpc>([this](const RemoteAddInEdgeReq &req) {
auto to_delta = database::StateDelta::AddInEdge(
req.member.tx_id, req.member.to, req.member.from,
req.member.edge_address, req.member.edge_type);
auto result =
GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta);
return std::make_unique<RemoteAddInEdgeRes>(result);
});
server.Register<RemoteRemoveVertexRpc>(
[this](const RemoteRemoveVertexReq &req) {
auto to_delta = database::StateDelta::RemoveVertex(
req.member.tx_id, req.member.gid, req.member.check_empty);
auto result =
GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta);
return std::make_unique<RemoteRemoveVertexRes>(result);
});
server.Register<RemoteRemoveEdgeRpc>([this](const RemoteRemoveEdgeReq &req) {
return std::make_unique<RemoteRemoveEdgeRes>(RemoveEdge(req.member));
});
server.Register<RemoteRemoveInEdgeRpc>(
[this](const RemoteRemoveInEdgeReq &req) {
auto data = req.member;
return std::make_unique<RemoteRemoveInEdgeRes>(
GetUpdates(vertex_updates_, data.tx_id)
.Emplace(database::StateDelta::RemoveInEdge(
data.tx_id, data.vertex, data.edge_address)));
});
}
RemoteUpdateResult RemoteUpdatesRpcServer::Apply(tx::transaction_id_t tx_id) {
auto apply = [tx_id](auto &collection) {
auto access = collection.access();
auto found = access.find(tx_id);
if (found == access.end()) {
return RemoteUpdateResult::DONE;
}
auto result = found->second.Apply();
access.remove(tx_id);
return result;
};
auto vertex_result = apply(vertex_updates_);
auto edge_result = apply(edge_updates_);
if (vertex_result != RemoteUpdateResult::DONE) return vertex_result;
if (edge_result != RemoteUpdateResult::DONE) return edge_result;
return RemoteUpdateResult::DONE;
}
void RemoteUpdatesRpcServer::ClearTransactionalCache(
tx::transaction_id_t oldest_active) {
auto vertex_access = vertex_updates_.access();
for (auto &kv : vertex_access) {
if (kv.first < oldest_active) {
vertex_access.remove(kv.first);
}
}
auto edge_access = edge_updates_.access();
for (auto &kv : edge_access) {
if (kv.first < oldest_active) {
edge_access.remove(kv.first);
}
}
}
// Gets/creates the TransactionUpdates for the given transaction.
template <typename TAccessor>
RemoteUpdatesRpcServer::TransactionUpdates<TAccessor>
&RemoteUpdatesRpcServer::GetUpdates(MapT<TAccessor> &updates,
tx::transaction_id_t tx_id) {
return updates.access()
.emplace(tx_id, std::make_tuple(tx_id),
std::make_tuple(std::ref(db_), tx_id))
.first->second;
}
RemoteCreateResult RemoteUpdatesRpcServer::CreateEdge(
const RemoteCreateEdgeReqData &req) {
auto gid = GetUpdates(edge_updates_, req.tx_id)
.CreateEdge(req.from, req.to, req.edge_type);
auto from_delta = database::StateDelta::AddOutEdge(
req.tx_id, req.from, req.to, {gid, db_.WorkerId()}, req.edge_type);
auto result = GetUpdates(vertex_updates_, req.tx_id).Emplace(from_delta);
return {result, gid};
}
RemoteUpdateResult RemoteUpdatesRpcServer::RemoveEdge(
const RemoteRemoveEdgeData &data) {
// Edge removal.
auto deletion_delta =
database::StateDelta::RemoveEdge(data.tx_id, data.edge_id);
auto result = GetUpdates(edge_updates_, data.tx_id).Emplace(deletion_delta);
// Out-edge removal, for sure is local.
if (result == RemoteUpdateResult::DONE) {
auto remove_out_delta = database::StateDelta::RemoveOutEdge(
data.tx_id, data.vertex_from_id, {data.edge_id, db_.WorkerId()});
result = GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_out_delta);
}
// In-edge removal, might not be local.
if (result == RemoteUpdateResult::DONE &&
data.vertex_to_address.worker_id() == db_.WorkerId()) {
auto remove_in_delta = database::StateDelta::RemoveInEdge(
data.tx_id, data.vertex_to_address.gid(),
{data.edge_id, db_.WorkerId()});
result = GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_in_delta);
}
return result;
}
template <>
VertexAccessor
RemoteUpdatesRpcServer::TransactionUpdates<VertexAccessor>::FindAccessor(
gid::Gid gid) {
return db_accessor_.FindVertexChecked(gid, false);
}
template <>
EdgeAccessor
RemoteUpdatesRpcServer::TransactionUpdates<EdgeAccessor>::FindAccessor(
gid::Gid gid) {
return db_accessor_.FindEdgeChecked(gid, false);
}
} // namespace distributed

View File

@ -1,8 +1,6 @@
#pragma once #pragma once
#include <mutex>
#include <unordered_map> #include <unordered_map>
#include <utility>
#include <vector> #include <vector>
#include "glog/logging.h" #include "glog/logging.h"
@ -13,13 +11,11 @@
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp" #include "database/state_delta.hpp"
#include "distributed/remote_updates_rpc_messages.hpp" #include "distributed/remote_updates_rpc_messages.hpp"
#include "mvcc/version_list.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/gid.hpp" #include "storage/gid.hpp"
#include "storage/record_accessor.hpp"
#include "storage/types.hpp" #include "storage/types.hpp"
#include "storage/vertex_accessor.hpp" #include "storage/vertex_accessor.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
#include "threading/sync/spinlock.hpp" #include "threading/sync/spinlock.hpp"
#include "transactions/type.hpp" #include "transactions/type.hpp"
@ -42,170 +38,21 @@ class RemoteUpdatesRpcServer {
/// Adds a delta and returns the result. Does not modify the state (data) of /// Adds a delta and returns the result. Does not modify the state (data) of
/// the graph element the update is for, but calls the `update` method to /// the graph element the update is for, but calls the `update` method to
/// fail-fast on serialization and update-after-delete errors. /// fail-fast on serialization and update-after-delete errors.
RemoteUpdateResult Emplace(const database::StateDelta &delta) { RemoteUpdateResult Emplace(const database::StateDelta &delta);
auto gid = std::is_same<TRecordAccessor, VertexAccessor>::value
? delta.vertex_id
: delta.edge_id;
std::lock_guard<SpinLock> guard{lock_};
auto found = deltas_.find(gid);
if (found == deltas_.end()) {
found = deltas_
.emplace(gid, std::make_pair(
FindAccessor(gid),
std::vector<database::StateDelta>{}))
.first;
}
found->second.second.emplace_back(delta);
// TODO call `RecordAccessor::update` to force serialization errors to
// fail-fast (as opposed to when all the deltas get applied).
//
// This is problematic because `VersionList::update` needs to become
// thread-safe within the same transaction. Note that the concurrency is
// possible both between the owner worker interpretation thread and an RPC
// thread (current thread), as well as multiple RPC threads if this
// object's lock is released (perhaps desirable).
//
// A potential solution *might* be that `LockStore::Lock` returns a `bool`
// indicating if the caller was the one obtaining the lock (not the same
// as lock already being held by the same transaction).
//
// Another thing that needs to be done (if we do this) is ensuring that
// `LockStore::Take` is thread-safe when called in parallel in the same
// transaction. Currently it's thread-safe only when called in parallel
// from different transactions (only one manages to take the RecordLock).
//
// Deferring the implementation of this as it's tricky, and essentially an
// optimization.
//
// try {
// found->second.first.update();
// } catch (const mvcc::SerializationError &) {
// return RemoteUpdateResult::SERIALIZATION_ERROR;
// } catch (const RecordDeletedError &) {
// return RemoteUpdateResult::UPDATE_DELETED_ERROR;
// } catch (const LockTimeoutException &) {
// return RemoteUpdateResult::LOCK_TIMEOUT_ERROR;
// }
return RemoteUpdateResult::DONE;
}
/// Creates a new vertex and returns it's gid. /// Creates a new vertex and returns it's gid.
gid::Gid CreateVertex( gid::Gid CreateVertex(
const std::vector<storage::Label> &labels, const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue> const std::unordered_map<storage::Property, query::TypedValue>
&properties) { &properties);
auto result = db_accessor_.InsertVertex();
for (auto &label : labels) result.add_label(label);
for (auto &kv : properties) result.PropsSet(kv.first, kv.second);
std::lock_guard<SpinLock> guard{lock_};
deltas_.emplace(
result.gid(),
std::make_pair(result, std::vector<database::StateDelta>{}));
return result.gid();
}
/// Creates a new edge and returns it's gid. Does not update vertices at the /// Creates a new edge and returns it's gid. Does not update vertices at the
/// end of the edge. /// end of the edge.
gid::Gid CreateEdge(gid::Gid from, storage::VertexAddress to, gid::Gid CreateEdge(gid::Gid from, storage::VertexAddress to,
storage::EdgeType edge_type) { storage::EdgeType edge_type);
auto &db = db_accessor_.db();
auto edge = db_accessor_.InsertOnlyEdge(
{from, db.WorkerId()}, db.storage().LocalizedAddressIfPossible(to),
edge_type);
std::lock_guard<SpinLock> guard{lock_};
deltas_.emplace(
edge.gid(),
std::make_pair(edge, std::vector<database::StateDelta>{}));
return edge.gid();
}
/// Applies all the deltas on the record. /// Applies all the deltas on the record.
RemoteUpdateResult Apply() { RemoteUpdateResult Apply();
std::lock_guard<SpinLock> guard{lock_};
for (auto &kv : deltas_) {
auto &record_accessor = kv.second.first;
// We need to reconstruct the record as in the meantime some local
// update might have updated it.
record_accessor.Reconstruct();
for (database::StateDelta &delta : kv.second.second) {
try {
auto &dba = db_accessor_;
switch (delta.type) {
case database::StateDelta::Type::TRANSACTION_BEGIN:
case database::StateDelta::Type::TRANSACTION_COMMIT:
case database::StateDelta::Type::TRANSACTION_ABORT:
case database::StateDelta::Type::CREATE_VERTEX:
case database::StateDelta::Type::CREATE_EDGE:
case database::StateDelta::Type::BUILD_INDEX:
LOG(FATAL) << "Can only apply record update deltas for remote "
"graph element";
case database::StateDelta::Type::REMOVE_VERTEX:
if (!db_accessor().RemoveVertex(
reinterpret_cast<VertexAccessor &>(record_accessor),
delta.check_empty)) {
return RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR;
}
break;
case database::StateDelta::Type::SET_PROPERTY_VERTEX:
case database::StateDelta::Type::SET_PROPERTY_EDGE:
record_accessor.PropsSet(delta.property, delta.value);
break;
case database::StateDelta::Type::ADD_LABEL:
reinterpret_cast<VertexAccessor &>(record_accessor)
.add_label(delta.label);
break;
case database::StateDelta::Type::REMOVE_LABEL:
reinterpret_cast<VertexAccessor &>(record_accessor)
.remove_label(delta.label);
break;
case database::StateDelta::Type::ADD_OUT_EDGE:
reinterpret_cast<Vertex &>(record_accessor.update())
.out_.emplace(dba.db().storage().LocalizedAddressIfPossible(
delta.vertex_to_address),
dba.db().storage().LocalizedAddressIfPossible(
delta.edge_address),
delta.edge_type);
dba.wal().Emplace(delta);
break;
case database::StateDelta::Type::ADD_IN_EDGE:
reinterpret_cast<Vertex &>(record_accessor.update())
.in_.emplace(dba.db().storage().LocalizedAddressIfPossible(
delta.vertex_from_address),
dba.db().storage().LocalizedAddressIfPossible(
delta.edge_address),
delta.edge_type);
dba.wal().Emplace(delta);
break;
case database::StateDelta::Type::REMOVE_EDGE:
// We only remove the edge as a result of this StateDelta,
// because the removal of edge from vertex in/out is performed
// in REMOVE_[IN/OUT]_EDGE deltas.
db_accessor_.RemoveEdge(
reinterpret_cast<EdgeAccessor &>(record_accessor), false,
false);
break;
case database::StateDelta::Type::REMOVE_OUT_EDGE:
reinterpret_cast<VertexAccessor &>(record_accessor)
.RemoveOutEdge(delta.edge_address);
break;
case database::StateDelta::Type::REMOVE_IN_EDGE:
reinterpret_cast<VertexAccessor &>(record_accessor)
.RemoveInEdge(delta.edge_address);
break;
}
} catch (const mvcc::SerializationError &) {
return RemoteUpdateResult::SERIALIZATION_ERROR;
} catch (const RecordDeletedError &) {
return RemoteUpdateResult::UPDATE_DELETED_ERROR;
} catch (const LockTimeoutException &) {
return RemoteUpdateResult::LOCK_TIMEOUT_ERROR;
}
}
}
return RemoteUpdateResult::DONE;
}
auto &db_accessor() { return db_accessor_; } auto &db_accessor() { return db_accessor_; }
@ -223,132 +70,16 @@ class RemoteUpdatesRpcServer {
public: public:
RemoteUpdatesRpcServer(database::GraphDb &db, RemoteUpdatesRpcServer(database::GraphDb &db,
communication::rpc::Server &server) communication::rpc::Server &server);
: db_(db) {
server.Register<RemoteUpdateRpc>([this](const RemoteUpdateReq &req) {
using DeltaType = database::StateDelta::Type;
auto &delta = req.member;
switch (delta.type) {
case DeltaType::SET_PROPERTY_VERTEX:
case DeltaType::ADD_LABEL:
case DeltaType::REMOVE_LABEL:
case database::StateDelta::Type::REMOVE_OUT_EDGE:
case database::StateDelta::Type::REMOVE_IN_EDGE:
return std::make_unique<RemoteUpdateRes>(
GetUpdates(vertex_updates_, delta.transaction_id).Emplace(delta));
case DeltaType::SET_PROPERTY_EDGE:
return std::make_unique<RemoteUpdateRes>(
GetUpdates(edge_updates_, delta.transaction_id).Emplace(delta));
default:
LOG(FATAL) << "Can't perform a remote update with delta type: "
<< static_cast<int>(req.member.type);
}
});
server.Register<RemoteUpdateApplyRpc>(
[this](const RemoteUpdateApplyReq &req) {
return std::make_unique<RemoteUpdateApplyRes>(Apply(req.member));
});
server.Register<RemoteCreateVertexRpc>(
[this](const RemoteCreateVertexReq &req) {
gid::Gid gid =
GetUpdates(vertex_updates_, req.member.tx_id)
.CreateVertex(req.member.labels, req.member.properties);
return std::make_unique<RemoteCreateVertexRes>(
RemoteCreateResult{RemoteUpdateResult::DONE, gid});
});
server.Register<RemoteCreateEdgeRpc>(
[this](const RemoteCreateEdgeReq &req) {
auto data = req.member;
auto creation_result = CreateEdge(data);
// If `from` and `to` are both on this worker, we handle it in this
// RPC call. Do it only if CreateEdge succeeded.
if (creation_result.result == RemoteUpdateResult::DONE &&
data.to.worker_id() == db_.WorkerId()) {
auto to_delta = database::StateDelta::AddInEdge(
data.tx_id, data.to.gid(), {data.from, db_.WorkerId()},
{creation_result.gid, db_.WorkerId()}, data.edge_type);
creation_result.result =
GetUpdates(vertex_updates_, data.tx_id).Emplace(to_delta);
}
return std::make_unique<RemoteCreateEdgeRes>(creation_result);
});
server.Register<RemoteAddInEdgeRpc>([this](const RemoteAddInEdgeReq &req) {
auto to_delta = database::StateDelta::AddInEdge(
req.member.tx_id, req.member.to, req.member.from,
req.member.edge_address, req.member.edge_type);
auto result =
GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta);
return std::make_unique<RemoteAddInEdgeRes>(result);
});
server.Register<RemoteRemoveVertexRpc>(
[this](const RemoteRemoveVertexReq &req) {
auto to_delta = database::StateDelta::RemoveVertex(
req.member.tx_id, req.member.gid, req.member.check_empty);
auto result =
GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta);
return std::make_unique<RemoteRemoveVertexRes>(result);
});
server.Register<RemoteRemoveEdgeRpc>(
[this](const RemoteRemoveEdgeReq &req) {
return std::make_unique<RemoteRemoveEdgeRes>(RemoveEdge(req.member));
});
server.Register<RemoteRemoveInEdgeRpc>(
[this](const RemoteRemoveInEdgeReq &req) {
auto data = req.member;
return std::make_unique<RemoteRemoveInEdgeRes>(
GetUpdates(vertex_updates_, data.tx_id)
.Emplace(database::StateDelta::RemoveInEdge(
data.tx_id, data.vertex, data.edge_address)));
});
}
/// Applies all existsing updates for the given transaction ID. If there are /// Applies all existsing updates for the given transaction ID. If there are
/// no updates for that transaction, nothing happens. Clears the updates cache /// no updates for that transaction, nothing happens. Clears the updates cache
/// after applying them, regardless of the result. /// after applying them, regardless of the result.
RemoteUpdateResult Apply(tx::transaction_id_t tx_id) { RemoteUpdateResult Apply(tx::transaction_id_t tx_id);
auto apply = [tx_id](auto &collection) {
auto access = collection.access();
auto found = access.find(tx_id);
if (found == access.end()) {
return RemoteUpdateResult::DONE;
}
auto result = found->second.Apply();
access.remove(tx_id);
return result;
};
auto vertex_result = apply(vertex_updates_);
auto edge_result = apply(edge_updates_);
if (vertex_result != RemoteUpdateResult::DONE) return vertex_result;
if (edge_result != RemoteUpdateResult::DONE) return edge_result;
return RemoteUpdateResult::DONE;
}
/// Clears the cache of local transactions that have expired. The signature of /// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::CacheCleaner`. /// this method is dictated by `distributed::CacheCleaner`.
void ClearTransactionalCache(tx::transaction_id_t oldest_active) { void ClearTransactionalCache(tx::transaction_id_t oldest_active);
auto vertex_access = vertex_updates_.access();
for (auto &kv : vertex_access) {
if (kv.first < oldest_active) {
vertex_access.remove(kv.first);
}
}
auto edge_access = edge_updates_.access();
for (auto &kv : edge_access) {
if (kv.first < oldest_active) {
edge_access.remove(kv.first);
}
}
}
private: private:
database::GraphDb &db_; database::GraphDb &db_;
@ -362,62 +93,13 @@ class RemoteUpdatesRpcServer {
// Gets/creates the TransactionUpdates for the given transaction. // Gets/creates the TransactionUpdates for the given transaction.
template <typename TAccessor> template <typename TAccessor>
TransactionUpdates<TAccessor> &GetUpdates(MapT<TAccessor> &updates, TransactionUpdates<TAccessor> &GetUpdates(MapT<TAccessor> &updates,
tx::transaction_id_t tx_id) { tx::transaction_id_t tx_id);
return updates.access()
.emplace(tx_id, std::make_tuple(tx_id),
std::make_tuple(std::ref(db_), tx_id))
.first->second;
}
RemoteCreateResult CreateEdge(const RemoteCreateEdgeReqData &req) { // Performs edge creation for the given request.
auto gid = GetUpdates(edge_updates_, req.tx_id) RemoteCreateResult CreateEdge(const RemoteCreateEdgeReqData &req);
.CreateEdge(req.from, req.to, req.edge_type);
auto from_delta = database::StateDelta::AddOutEdge( // Performs edge removal for the given request.
req.tx_id, req.from, req.to, {gid, db_.WorkerId()}, req.edge_type); RemoteUpdateResult RemoveEdge(const RemoteRemoveEdgeData &data);
auto result = GetUpdates(vertex_updates_, req.tx_id).Emplace(from_delta);
return {result, gid};
}
RemoteUpdateResult RemoveEdge(const RemoteRemoveEdgeData &data) {
// Edge removal.
auto deletion_delta =
database::StateDelta::RemoveEdge(data.tx_id, data.edge_id);
auto result = GetUpdates(edge_updates_, data.tx_id).Emplace(deletion_delta);
// Out-edge removal, for sure is local.
if (result == RemoteUpdateResult::DONE) {
auto remove_out_delta = database::StateDelta::RemoveOutEdge(
data.tx_id, data.vertex_from_id, {data.edge_id, db_.WorkerId()});
result =
GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_out_delta);
}
// In-edge removal, might not be local.
if (result == RemoteUpdateResult::DONE &&
data.vertex_to_address.worker_id() == db_.WorkerId()) {
auto remove_in_delta = database::StateDelta::RemoveInEdge(
data.tx_id, data.vertex_to_address.gid(),
{data.edge_id, db_.WorkerId()});
result = GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_in_delta);
}
return result;
}
}; };
template <>
inline VertexAccessor
RemoteUpdatesRpcServer::TransactionUpdates<VertexAccessor>::FindAccessor(
gid::Gid gid) {
return db_accessor_.FindVertexChecked(gid, false);
}
template <>
inline EdgeAccessor
RemoteUpdatesRpcServer::TransactionUpdates<EdgeAccessor>::FindAccessor(
gid::Gid gid) {
return db_accessor_.FindEdgeChecked(gid, false);
}
} // namespace distributed } // namespace distributed

View File

@ -4,6 +4,7 @@
#include "database/state_delta.hpp" #include "database/state_delta.hpp"
#include "distributed/remote_data_manager.hpp" #include "distributed/remote_data_manager.hpp"
#include "distributed/remote_updates_rpc_clients.hpp" #include "distributed/remote_updates_rpc_clients.hpp"
#include "query/exceptions.hpp"
#include "storage/edge.hpp" #include "storage/edge.hpp"
#include "storage/record_accessor.hpp" #include "storage/record_accessor.hpp"
#include "storage/vertex.hpp" #include "storage/vertex.hpp"