Add distributed Id Cypher function

Reviewers: msantl, vkasljevic, teon.banek, ipaljak

Reviewed By: msantl, teon.banek

Subscribers: pullbot, teon.banek

Differential Revision: https://phabricator.memgraph.io/D1477
This commit is contained in:
Marko Budiselic 2018-08-28 16:28:35 +02:00
parent e80c49f856
commit d9153921b8
40 changed files with 832 additions and 632 deletions

View File

@ -92,8 +92,7 @@ class DistributedRecordAccessor final {
if (address.is_local()) {
return address.local()->update(dba.transaction());
}
return data_manager_->FindNew<TRecord>(dba.transaction_id(),
address.gid());
return data_manager_->FindNew<TRecord>(dba.transaction_id(), address.gid());
}
void ProcessDelta(const RecordAccessor<TRecord> &record_accessor,
@ -122,6 +121,25 @@ class DistributedRecordAccessor final {
throw utils::LockTimeoutException("Lock timeout on remote worker");
}
}
int64_t CypherId(const RecordAccessor<TRecord> &record_accessor) {
auto &dba = record_accessor.db_accessor();
const auto &address = record_accessor.address();
if (record_accessor.is_local()) return address.local()->cypher_id();
// Fetch data from the cache.
//
// NOTE: This part is executed when we need to migrate
// a vertex and it has edges that don't belong to it. A machine that owns
// the vertex still need to figure out what is the cypher_id for each
// remote edge because the machine has to initiate remote edge creation
// and for that call it has to know the remote cypher_ids.
// TODO (buda): If we save cypher_id similar/next to edge_type we would save
// a network call.
return data_manager_
->Find<TRecord>(dba.transaction().id_, address.worker_id(),
address.gid())
.cypher_id;
}
};
class DistributedEdgeAccessor final : public ::RecordAccessor<Edge>::Impl {
@ -150,6 +168,10 @@ class DistributedEdgeAccessor final : public ::RecordAccessor<Edge>::Impl {
const database::StateDelta &delta) override {
return distributed_accessor_.ProcessDelta(ra, delta);
}
int64_t CypherId(const RecordAccessor<Edge> &ra) override {
return distributed_accessor_.CypherId(ra);
}
};
class DistributedVertexAccessor final : public ::VertexAccessor::Impl {
@ -216,6 +238,10 @@ class DistributedVertexAccessor final : public ::VertexAccessor::Impl {
if (!va.is_local()) distributed_accessor_.SendDelta(va, delta);
}
int64_t CypherId(const RecordAccessor<Vertex> &ra) override {
return distributed_accessor_.CypherId(ra);
}
};
//////////////////////////////////////////////////////////////////////
@ -297,14 +323,17 @@ class DistributedAccessor : public GraphDbAccessor {
return GraphDbAccessor::InsertEdgeOnFrom(from, to, edge_type,
requested_gid, cypher_id);
}
auto edge_address =
updates_clients_->CreateEdge(transaction_id(), *from, *to, edge_type);
auto created_edge_info = updates_clients_->CreateEdge(
transaction_id(), *from, *to, edge_type, cypher_id);
auto edge_address = created_edge_info.edge_address;
auto *from_updated =
data_manager_->FindNew<Vertex>(transaction_id(), from->gid());
// Create an Edge and insert it into the Cache so we see it locally.
data_manager_->Emplace<Edge>(
transaction_id(), edge_address.gid(), nullptr,
std::make_unique<Edge>(from->address(), to->address(), edge_type));
transaction_id(), edge_address.gid(),
distributed::CachedRecordData<Edge>(
created_edge_info.cypher_id, nullptr,
std::make_unique<Edge>(from->address(), to->address(), edge_type)));
from_updated->out_.emplace(
db().storage().LocalizedAddressIfPossible(to->address()), edge_address,
edge_type);
@ -810,8 +839,8 @@ distributed::IndexRpcClients &Master::index_rpc_clients() {
VertexAccessor InsertVertexIntoRemote(
GraphDbAccessor *dba, int worker_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties) {
const std::unordered_map<storage::Property, query::TypedValue> &properties,
std::experimental::optional<int64_t> cypher_id) {
// TODO: Replace this with virtual call or some other mechanism.
auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba->db());
@ -821,13 +850,16 @@ VertexAccessor InsertVertexIntoRemote(
auto *updates_clients = &distributed_db->updates_clients();
auto *data_manager = &distributed_db->data_manager();
CHECK(updates_clients && data_manager);
gid::Gid gid = updates_clients->CreateVertex(worker_id, dba->transaction_id(),
labels, properties);
auto created_vertex_info = updates_clients->CreateVertex(
worker_id, dba->transaction_id(), labels, properties, cypher_id);
auto vertex = std::make_unique<Vertex>();
vertex->labels_ = labels;
for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second);
data_manager->Emplace<Vertex>(dba->transaction_id(), gid, nullptr, std::move(vertex));
return VertexAccessor({gid, worker_id}, *dba);
data_manager->Emplace<Vertex>(
dba->transaction_id(), created_vertex_info.gid,
distributed::CachedRecordData<Vertex>(created_vertex_info.cypher_id,
nullptr, std::move(vertex)));
return VertexAccessor({created_vertex_info.gid, worker_id}, *dba);
}
//////////////////////////////////////////////////////////////////////

View File

@ -132,6 +132,7 @@ class Worker final : public DistributedGraphDb {
VertexAccessor InsertVertexIntoRemote(
GraphDbAccessor *dba, int worker_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue> &properties);
const std::unordered_map<storage::Property, query::TypedValue> &properties,
std::experimental::optional<int64_t> cypher_id);
} // namespace database

View File

@ -54,6 +54,10 @@ class SingleNodeRecordAccessor final {
CHECK(record_accessor.is_local());
record_accessor.db_accessor().wal().Emplace(delta);
}
int64_t CypherId(const RecordAccessor<TRecord> &record_accessor) {
return record_accessor.address().local()->cypher_id();
}
};
class VertexAccessorImpl final : public ::VertexAccessor::Impl {
@ -109,6 +113,10 @@ class VertexAccessorImpl final : public ::VertexAccessor::Impl {
dba.wal().Emplace(delta);
}
}
int64_t CypherId(const RecordAccessor<Vertex> &ra) override {
return accessor_.CypherId(ra);
}
};
class EdgeAccessorImpl final : public ::RecordAccessor<Edge>::Impl {
@ -133,6 +141,10 @@ class EdgeAccessorImpl final : public ::RecordAccessor<Edge>::Impl {
const database::StateDelta &delta) override {
return accessor_.ProcessDelta(ra, delta);
}
int64_t CypherId(const RecordAccessor<Edge> &ra) override {
return accessor_.CypherId(ra);
}
};
class SingleNodeAccessor : public GraphDbAccessor {

View File

@ -359,7 +359,7 @@ storage::EdgeAddress GraphDbAccessor::InsertEdgeOnFrom(
// `CREATE_EDGE`, but always have it split into 3 parts (edge insertion,
// in/out modification).
wal().Emplace(database::StateDelta::CreateEdge(
transaction_.id_, edge_accessor.gid(), edge_accessor.cypher_id(),
transaction_.id_, edge_accessor.gid(), edge_accessor.CypherId(),
from->gid(), to->gid(), edge_type, EdgeTypeName(edge_type)));
from_updated->out_.emplace(
@ -371,9 +371,9 @@ storage::EdgeAddress GraphDbAccessor::InsertEdgeOnFrom(
void GraphDbAccessor::InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const storage::EdgeAddress &edge_address) {
// ensure that the "to" accessor has the latest version (Switch new)
// WARNING: must do that after the above "from->update()" for cases when
// we are creating a cycle and "from" and "to" are the same vlist
// Ensure that the "to" accessor has the latest version (switch new).
// WARNING: Must do that after the above "from->update()" for cases when
// we are creating a cycle and "from" and "to" are the same vlist.
to->SwitchNew();
auto *to_updated = &to->update();
to_updated->in_.emplace(

View File

@ -1,12 +1,14 @@
/// @file
#pragma once
#include <experimental/optional>
#include <string>
#include <vector>
#include <glog/logging.h>
#include <cppitertools/filter.hpp>
#include <cppitertools/imap.hpp>
#include <glog/logging.h>
#include "database/graph_db.hpp"
#include "storage/address_types.hpp"

View File

@ -82,12 +82,14 @@ std::experimental::optional<VertexAccessor> BfsRpcClients::Pull(
CHECK(res) << "SubcursorPull RPC failed!";
if (!res->vertex) return std::experimental::nullopt;
data_manager_->Emplace<Vertex>(dba->transaction_id(),
res->vertex->global_address.gid(),
std::move(res->vertex->old_element_output),
std::move(res->vertex->new_element_output));
data_manager_->Emplace<Vertex>(
dba->transaction_id(), res->vertex->global_address.gid(),
distributed::CachedRecordData<Vertex>(
res->cypher_id, std::move(res->vertex->old_element_output),
std::move(res->vertex->new_element_output)));
return VertexAccessor(res->vertex->global_address, *dba);
}
bool BfsRpcClients::ExpandLevel(
const std::unordered_map<int16_t, int64_t> &subcursor_ids) {
auto futures = clients_->ExecuteOnWorkers<bool>(
@ -140,8 +142,11 @@ PathSegment BuildPathSegment(ReconstructPathRes *res,
distributed::DataManager *data_manager) {
std::vector<EdgeAccessor> edges;
for (auto &edge : res->edges) {
data_manager->Emplace<Edge>(dba->transaction_id(), edge.global_address.gid(), std::move(edge.old_element_output),
std::move(edge.new_element_output));
data_manager->Emplace<Edge>(
dba->transaction_id(), edge.global_address.gid(),
distributed::CachedRecordData<Edge>(
edge.cypher_id, std::move(edge.old_element_output),
std::move(edge.new_element_output)));
edges.emplace_back(edge.global_address, *dba);
}

View File

@ -54,7 +54,8 @@ cpp<#
cpp<#))
(lcp:define-struct (serialized-graph-element t-element) ()
((global-address "storage::Address<mvcc::VersionList<TElement>>"
((cypher-id :int64_t)
(global-address "storage::Address<mvcc::VersionList<TElement>>"
:capnp-type "Storage.Address")
(old-element-input "TElement *"
:capnp-type '((null "Void") (vertex "Dis.Vertex") (edge "Dis.Edge"))
@ -67,10 +68,12 @@ cpp<#
(worker-id :int16_t :capnp-save :dont-save))
(:public
#>cpp
SerializedGraphElement(storage::Address<mvcc::VersionList<TElement>> global_address,
SerializedGraphElement(int64_t cypher_id,
storage::Address<mvcc::VersionList<TElement>> global_address,
TElement *old_element_input, TElement *new_element_input,
int16_t worker_id)
: global_address(global_address),
: cypher_id(cypher_id),
global_address(global_address),
old_element_input(old_element_input),
old_element_output(nullptr),
new_element_input(new_element_input),
@ -81,8 +84,10 @@ cpp<#
}
SerializedGraphElement(const RecordAccessor<TElement> &accessor, int16_t worker_id)
: SerializedGraphElement(accessor.GlobalAddress(), accessor.GetOld(),
accessor.GetNew(), worker_id) {}
: SerializedGraphElement(accessor.CypherId(),
accessor.GlobalAddress(),
accessor.GetOld(), accessor.GetNew(),
worker_id) {}
SerializedGraphElement() {}
cpp<#)
@ -159,10 +164,13 @@ cpp<#
(lcp:define-rpc subcursor-pull
(:request ((member :int64_t)))
(:response ((vertex "std::experimental::optional<SerializedVertex>" :initarg :move
:capnp-type "Utils.Optional(SerializedGraphElement)"
:capnp-save (lcp:capnp-save-optional "capnp::SerializedGraphElement" "SerializedVertex")
:capnp-load (lcp:capnp-load-optional "capnp::SerializedGraphElement" "SerializedVertex")))))
(:response
((cypher-id :int64_t)
(vertex "std::experimental::optional<SerializedVertex>" :initarg :move
:capnp-type "Utils.Optional(SerializedGraphElement)"
:capnp-save (lcp:capnp-save-optional "capnp::SerializedGraphElement" "SerializedVertex")
:capnp-load (lcp:capnp-load-optional "capnp::SerializedGraphElement" "SerializedVertex")))))
(lcp:define-rpc set-source
(:request
((subcursor-id :int64_t)

View File

@ -1,4 +1,5 @@
/// @file
#pragma once
#include <map>
@ -84,7 +85,8 @@ class BfsRpcServer {
res.Save(res_builder);
return;
}
SubcursorPullRes res(SerializedVertex(*vertex, db_->WorkerId()));
SubcursorPullRes res(vertex->CypherId(),
SerializedVertex(*vertex, db_->WorkerId()));
res.Save(res_builder);
});
@ -111,8 +113,8 @@ class BfsRpcServer {
} else {
LOG(FATAL) << "`edge` or `vertex` should be set in ReconstructPathReq";
}
ReconstructPathRes res(result.edges, result.next_vertex,
result.next_edge, db_->WorkerId());
ReconstructPathRes res(result.edges, result.next_vertex, result.next_edge,
db_->WorkerId());
res.Save(res_builder);
});

View File

@ -1,99 +0,0 @@
#include "glog/logging.h"
#include "database/storage.hpp"
#include "distributed/cache.hpp"
#include "storage/edge.hpp"
#include "storage/vertex.hpp"
namespace distributed {
template <typename TRecord>
TRecord *Cache<TRecord>::FindNew(gid::Gid gid) {
std::lock_guard<std::mutex> guard{lock_};
auto found = cache_.find(gid);
DCHECK(found != cache_.end())
<< "FindNew for uninitialized remote Vertex/Edge";
auto &pair = found->second;
if (!pair.second) {
pair.second = std::unique_ptr<TRecord>(pair.first->CloneData());
}
return pair.second.get();
}
template <typename TRecord>
void Cache<TRecord>::FindSetOldNew(tx::TransactionId tx_id, int worker_id,
gid::Gid gid, TRecord *&old_record,
TRecord *&new_record) {
{
std::lock_guard<std::mutex> guard(lock_);
auto found = cache_.find(gid);
if (found != cache_.end()) {
old_record = found->second.first.get();
new_record = found->second.second.get();
return;
}
}
auto remote = data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid);
LocalizeAddresses(*remote);
// This logic is a bit strange because we need to make sure that someone
// else didn't get a response and updated the cache before we did and we
// need a lock for that, but we also need to check if we can now return
// that result - otherwise we could get incosistent results for remote
// FindSetOldNew
std::lock_guard<std::mutex> guard(lock_);
auto it_pair = cache_.emplace(
gid, std::make_pair<rec_uptr, rec_uptr>(std::move(remote), nullptr));
old_record = it_pair.first->second.first.get();
new_record = it_pair.first->second.second.get();
}
template <typename TRecord>
void Cache<TRecord>::emplace(gid::Gid gid, rec_uptr old_record,
rec_uptr new_record) {
if (old_record) LocalizeAddresses(*old_record);
if (new_record) LocalizeAddresses(*new_record);
std::lock_guard<std::mutex> guard{lock_};
// We can't replace existing data because some accessors might be using
// it.
// TODO - consider if it's necessary and OK to copy just the data content.
auto found = cache_.find(gid);
if (found != cache_.end())
return;
else
cache_[gid] = std::make_pair(std::move(old_record), std::move(new_record));
}
template <typename TRecord>
void Cache<TRecord>::ClearCache() {
std::lock_guard<std::mutex> guard{lock_};
cache_.clear();
}
template <>
void Cache<Vertex>::LocalizeAddresses(Vertex &vertex) {
auto localize_edges = [this](auto &edges) {
for (auto &element : edges) {
element.vertex = storage_.LocalizedAddressIfPossible(element.vertex);
element.edge = storage_.LocalizedAddressIfPossible(element.edge);
}
};
localize_edges(vertex.in_.storage());
localize_edges(vertex.out_.storage());
}
template <>
void Cache<Edge>::LocalizeAddresses(Edge &edge) {
edge.from_ = storage_.LocalizedAddressIfPossible(edge.from_);
edge.to_ = storage_.LocalizedAddressIfPossible(edge.to_);
}
template class Cache<Vertex>;
template class Cache<Edge>;
} // namespace distributed

View File

@ -1,3 +1,5 @@
/// @file
#pragma once
#include <mutex>
@ -40,21 +42,13 @@ class Cache {
return cache_.emplace(std::forward<TKey>(key), std::forward<TValue>(value));
}
void erase(const TKey &key) {
cache_.erase(key);
}
Iterator end() {
return cache_.end();
}
void erase(const TKey &key) { cache_.erase(key); }
bool contains(const TKey &key) {
return find(key) != end();
}
Iterator end() { return cache_.end(); }
void clear() {
cache_.clear();
}
bool contains(const TKey &key) { return find(key) != end(); }
void clear() { cache_.clear(); }
private:
std::unordered_map<TKey, TValue> cache_;

View File

@ -1,3 +1,5 @@
/// @file
#pragma once
#include "data_structures/concurrent/concurrent_map.hpp"
@ -11,12 +13,26 @@ class Edge;
namespace distributed {
/// A wrapper for cached vertex/edge from other machines in the distributed
/// system.
///
/// @tparam TRecord Vertex or Edge
template <typename TRecord>
struct CachedRecordData {
CachedRecordData(int64_t cypher_id, std::unique_ptr<TRecord> old_record,
std::unique_ptr<TRecord> new_record)
: cypher_id(cypher_id),
old_record(std::move(old_record)),
new_record(std::move(new_record)) {}
int64_t cypher_id;
std::unique_ptr<TRecord> old_record;
std::unique_ptr<TRecord> new_record;
};
/// Handles remote data caches for edges and vertices, per transaction.
class DataManager {
template <typename TRecord>
using CacheG =
Cache<gid::Gid,
std::pair<std::unique_ptr<TRecord>, std::unique_ptr<TRecord>>>;
using CacheG = Cache<gid::Gid, CachedRecordData<TRecord>>;
template <typename TRecord>
using CacheT = ConcurrentMap<tx::TransactionId, CacheG<TRecord>>;
@ -35,12 +51,11 @@ class DataManager {
DCHECK(found != cache.end())
<< "FindNew is called on uninitialized remote Vertex/Edge";
auto &pair = found->second;
if (!pair.second) {
pair.second = std::unique_ptr<TRecord>(pair.first->CloneData());
auto &data = found->second;
if (!data.new_record) {
data.new_record = std::unique_ptr<TRecord>(data.old_record->CloneData());
}
return pair.second.get();
return data.new_record.get();
}
/// For the Vertex/Edge with the given global ID, looks for the data visible
@ -57,14 +72,14 @@ class DataManager {
std::lock_guard<std::mutex> guard(lock);
auto found = cache.find(gid);
if (found != cache.end()) {
*old_record = found->second.first.get();
*new_record = found->second.second.get();
*old_record = found->second.old_record.get();
*new_record = found->second.new_record.get();
return;
}
}
auto remote = data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid);
LocalizeAddresses(*remote);
LocalizeAddresses(*remote.record_ptr);
// This logic is a bit strange because we need to make sure that someone
// else didn't get a response and updated the cache before we did and we
@ -72,21 +87,45 @@ class DataManager {
// that result - otherwise we could get incosistent results for remote
// FindSetOldNew
std::lock_guard<std::mutex> guard(lock);
auto it_pair = cache.emplace(std::move(gid),
std::make_pair(std::move(remote), nullptr));
auto it_pair = cache.emplace(
std::move(gid),
CachedRecordData<TRecord>(remote.cypher_id,
std::move(remote.record_ptr), nullptr));
*old_record = it_pair.first->second.first.get();
*new_record = it_pair.first->second.second.get();
*old_record = it_pair.first->second.old_record.get();
*new_record = it_pair.first->second.new_record.get();
}
/// Finds cached element for the given transaction, worker and gid.
///
/// @tparam TRecord Vertex or Edge
template <typename TRecord>
const CachedRecordData<TRecord> &Find(tx::TransactionId tx_id, int worker_id,
gid::Gid gid) {
auto &cache = GetCache<TRecord>(tx_id);
std::unique_lock<std::mutex> guard(GetLock(tx_id));
auto found = cache.find(gid);
if (found != cache.end()) {
return found->second;
} else {
guard.unlock();
auto remote = data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid);
LocalizeAddresses(*remote.record_ptr);
guard.lock();
return cache
.emplace(std::move(gid),
CachedRecordData<TRecord>(
remote.cypher_id, std::move(remote.record_ptr), nullptr))
.first->second;
}
}
/// Sets the given records as (new, old) data for the given gid.
template <typename TRecord>
void Emplace(tx::TransactionId tx_id, gid::Gid gid,
std::unique_ptr<TRecord> old_record,
std::unique_ptr<TRecord> new_record) {
if (old_record) LocalizeAddresses(*old_record);
if (new_record) LocalizeAddresses(*new_record);
CachedRecordData<TRecord> data) {
if (data.old_record) LocalizeAddresses(*data.old_record);
if (data.new_record) LocalizeAddresses(*data.new_record);
std::lock_guard<std::mutex> guard(GetLock(tx_id));
// We can't replace existing data because some accessors might be using
@ -94,9 +133,7 @@ class DataManager {
// TODO - consider if it's necessary and OK to copy just the data content.
auto &cache = GetCache<TRecord>(tx_id);
auto found = cache.find(gid);
if (found == cache.end())
cache.emplace(std::move(gid), std::make_pair(std::move(old_record),
std::move(new_record)));
if (found == cache.end()) cache.emplace(std::move(gid), std::move(data));
}
/// Removes all the caches for a single transaction.

View File

@ -8,23 +8,25 @@
namespace distributed {
template <>
std::unique_ptr<Edge> DataRpcClients::RemoteElement(int worker_id,
tx::TransactionId tx_id,
gid::Gid gid) {
auto response =
clients_.GetClientPool(worker_id).Call<EdgeRpc>(TxGidPair{tx_id, gid});
CHECK(response) << "EdgeRpc failed";
return std::move(response->edge_output);
}
template <>
std::unique_ptr<Vertex> DataRpcClients::RemoteElement(int worker_id,
RemoteElementInfo<Edge> DataRpcClients::RemoteElement(int worker_id,
tx::TransactionId tx_id,
gid::Gid gid) {
auto response =
clients_.GetClientPool(worker_id).Call<EdgeRpc>(TxGidPair{tx_id, gid});
CHECK(response) << "EdgeRpc failed";
return RemoteElementInfo<Edge>(response->cypher_id,
std::move(response->edge_output));
}
template <>
RemoteElementInfo<Vertex> DataRpcClients::RemoteElement(int worker_id,
tx::TransactionId tx_id,
gid::Gid gid) {
auto response =
clients_.GetClientPool(worker_id).Call<VertexRpc>(TxGidPair{tx_id, gid});
CHECK(response) << "VertexRpc failed";
return std::move(response->vertex_output);
return RemoteElementInfo<Vertex>(response->cypher_id,
std::move(response->vertex_output));
}
std::unordered_map<int, int64_t> DataRpcClients::VertexCounts(

View File

@ -1,3 +1,5 @@
/// @file
#pragma once
#include <mutex>
@ -9,16 +11,35 @@
namespace distributed {
template <typename TRecord>
struct RemoteElementInfo {
RemoteElementInfo() = delete;
RemoteElementInfo(const RemoteElementInfo &) = delete;
// TODO (buda): The default move constructor should be deleted but it seems
// that clang-3.9 doesn't know how to do RVO when this struct is used.
RemoteElementInfo(RemoteElementInfo &&) = default;
RemoteElementInfo &operator=(const RemoteElementInfo &) = delete;
RemoteElementInfo &operator=(RemoteElementInfo &&) = delete;
RemoteElementInfo(int64_t cypher_id, std::unique_ptr<TRecord> record_ptr)
: cypher_id(cypher_id), record_ptr(std::move(record_ptr)) {}
int64_t cypher_id;
std::unique_ptr<TRecord> record_ptr;
};
/// Provides access to other worker's data.
class DataRpcClients {
public:
DataRpcClients(RpcWorkerClients &clients) : clients_(clients) {}
/// Returns a remote worker's record (vertex/edge) data for the given params.
/// That worker must own the vertex/edge for the given id, and that vertex
/// must be visible in given transaction.
template <typename TRecord>
std::unique_ptr<TRecord> RemoteElement(int worker_id, tx::TransactionId tx_id,
gid::Gid gid);
RemoteElementInfo<TRecord> RemoteElement(int worker_id,
tx::TransactionId tx_id,
gid::Gid gid);
/// Returns (worker_id, vertex_count) for each worker and the number of
/// vertices on it from the perspective of transaction `tx_id`.

View File

@ -28,7 +28,8 @@ cpp<#
(lcp:define-rpc vertex
(:request ((member "TxGidPair")))
(:response
((vertex-input "const Vertex *"
((cypher-id :int64_t)
(vertex-input "const Vertex *"
:capnp-type "Dist.Vertex"
:capnp-save
(lambda (builder member)
@ -48,7 +49,8 @@ cpp<#
(lcp:define-rpc edge
(:request ((member "TxGidPair")))
(:response
((edge-input "const Edge *"
((cypher-id :int64_t)
(edge-input "const Edge *"
:capnp-type "Dist.Edge"
:capnp-save
(lambda (builder member)

View File

@ -17,7 +17,7 @@ DataRpcServer::DataRpcServer(database::DistributedGraphDb *db,
auto vertex = dba->FindVertex(req_reader.getMember().getGid(), false);
CHECK(vertex.GetOld())
<< "Old record must exist when sending vertex by RPC";
VertexRes response(vertex.GetOld(), db_->WorkerId());
VertexRes response(vertex.CypherId(), vertex.GetOld(), db_->WorkerId());
response.Save(res_builder);
});
@ -26,7 +26,7 @@ DataRpcServer::DataRpcServer(database::DistributedGraphDb *db,
auto dba = db_->Access(req_reader.getMember().getTxId());
auto edge = dba->FindEdge(req_reader.getMember().getGid(), false);
CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC";
EdgeRes response(edge.GetOld(), db_->WorkerId());
EdgeRes response(edge.CypherId(), edge.GetOld(), db_->WorkerId());
response.Save(res_builder);
});

View File

@ -132,7 +132,8 @@ frame (potentially embedded in lists/maps) is too error-prone.")
private:
cpp<#
(lcp:define-struct (graph-element-data t-record) ()
((global-address "storage::Address<mvcc::VersionList<TRecord>>")
((cypher-id :int64_t)
(global-address "storage::Address<mvcc::VersionList<TRecord>>")
(old-record "std::unique_ptr<TRecord>")
(new-record "std::unique_ptr<TRecord>")
(element-in-frame
@ -149,10 +150,11 @@ and the `element_in_frame` reference is used to set the appropriate accessor
to the appropriate value. Not used on side that generates the response.")
(:public
#>cpp
GraphElementData(storage::Address<mvcc::VersionList<TRecord>> address,
GraphElementData(int64_t cypher_id, storage::Address<mvcc::VersionList<TRecord>> address,
std::unique_ptr<TRecord> old_record, std::unique_ptr<TRecord> new_record,
query::TypedValue *element_in_frame)
: global_address(address),
: cypher_id(cypher_id),
global_address(address),
old_record(std::move(old_record)),
new_record(std::move(new_record)),
element_in_frame(element_in_frame) {}
@ -194,6 +196,7 @@ to the appropriate value. Not used on side that generates the response.")
const query::TypedValue &value,
distributed::capnp::TypedValue::Builder *builder) const {
auto save_element = [this](auto accessor, auto *builder) {
builder->setCypherId(accessor.CypherId());
builder->setAddress(accessor.GlobalAddress().raw());
// If both old and new are null, we need to reconstruct
if (!(accessor.GetOld() || accessor.GetNew())) {
@ -253,6 +256,7 @@ void PullResData::LoadGraphElement(
const distributed::capnp::TypedValue::Reader &reader,
query::TypedValue *value, distributed::DataManager *data_manager) {
auto load_vertex = [dba, data_manager](const auto &vertex_reader) {
int64_t cypher_id = vertex_reader.getCypherId();
storage::VertexAddress global_address(vertex_reader.getAddress());
auto old_record =
vertex_reader.hasOld()
@ -262,11 +266,17 @@ void PullResData::LoadGraphElement(
vertex_reader.hasNew()
? distributed::LoadVertex(vertex_reader.getNew())
: nullptr;
data_manager->Emplace<Vertex>(dba->transaction_id(), global_address.gid(),
std::move(old_record), std::move(new_record));
data_manager->Emplace<Vertex>(
dba->transaction_id(), global_address.gid(),
distributed::CachedRecordData<Vertex>(cypher_id,
std::move(old_record),
std::move(new_record)));
// We don't need to pass cypher_id here because cypher_id is going to be
// fetched from the cache.
return VertexAccessor(global_address, *dba);
};
auto load_edge = [dba, data_manager](const auto &edge_reader) {
int64_t cypher_id = edge_reader.getCypherId();
storage::EdgeAddress global_address(edge_reader.getAddress());
auto old_record =
edge_reader.hasOld()
@ -276,8 +286,11 @@ void PullResData::LoadGraphElement(
edge_reader.hasNew()
? distributed::LoadEdge(edge_reader.getNew())
: nullptr;
data_manager->Emplace<Edge>(dba->transaction_id(), global_address.gid(),
std::move(old_record), std::move(new_record));
data_manager->Emplace<Edge>(
dba->transaction_id(), global_address.gid(),
distributed::CachedRecordData<Edge>(cypher_id,
std::move(old_record),
std::move(new_record)));
return EdgeAccessor(global_address, *dba);
};
switch (reader.which()) {

View File

@ -53,15 +53,17 @@ struct TypedValue {
}
struct VertexAccessor {
address @0 :UInt64;
old @1 :Vertex;
new @2: Vertex;
cypherId @0 :Int64;
address @1 :UInt64;
old @2 :Vertex;
new @3 :Vertex;
}
struct EdgeAccessor {
address @0 :UInt64;
old @1 :Edge;
new @2: Edge;
cypherId @0 :Int64;
address @1 :UInt64;
old @2 :Edge;
new @3 :Edge;
}
struct Path {

View File

@ -1,4 +1,3 @@
#include <unordered_map>
#include <vector>
@ -24,7 +23,7 @@ void RaiseIfRemoteError(UpdateResult result) {
break;
}
}
}
} // namespace
UpdateResult UpdatesRpcClients::Update(int worker_id,
const database::StateDelta &delta) {
@ -33,34 +32,36 @@ UpdateResult UpdatesRpcClients::Update(int worker_id,
return res->member;
}
gid::Gid UpdatesRpcClients::CreateVertex(
CreatedVertexInfo UpdatesRpcClients::CreateVertex(
int worker_id, tx::TransactionId tx_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties) {
const std::unordered_map<storage::Property, query::TypedValue> &properties,
std::experimental::optional<int64_t> cypher_id) {
auto res = worker_clients_.GetClientPool(worker_id).Call<CreateVertexRpc>(
CreateVertexReqData{tx_id, labels, properties});
CreateVertexReqData{tx_id, labels, properties, cypher_id});
CHECK(res) << "CreateVertexRpc failed on worker: " << worker_id;
CHECK(res->member.result == UpdateResult::DONE)
<< "Remote Vertex creation result not UpdateResult::DONE";
return res->member.gid;
return CreatedVertexInfo(res->member.cypher_id, res->member.gid);
}
storage::EdgeAddress UpdatesRpcClients::CreateEdge(
CreatedEdgeInfo UpdatesRpcClients::CreateEdge(
tx::TransactionId tx_id, VertexAccessor &from, VertexAccessor &to,
storage::EdgeType edge_type) {
storage::EdgeType edge_type,
std::experimental::optional<int64_t> cypher_id) {
CHECK(from.address().is_remote()) << "In CreateEdge `from` must be remote";
int from_worker = from.address().worker_id();
auto res = worker_clients_.GetClientPool(from_worker)
.Call<CreateEdgeRpc>(CreateEdgeReqData{
from.gid(), to.GlobalAddress(), edge_type, tx_id});
auto res =
worker_clients_.GetClientPool(from_worker)
.Call<CreateEdgeRpc>(CreateEdgeReqData{from.gid(), to.GlobalAddress(),
edge_type, tx_id, cypher_id});
CHECK(res) << "CreateEdge RPC failed on worker: " << from_worker;
RaiseIfRemoteError(res->member.result);
return {res->member.gid, from_worker};
return CreatedEdgeInfo(res->member.cypher_id,
storage::EdgeAddress{res->member.gid, from_worker});
}
void UpdatesRpcClients::AddInEdge(tx::TransactionId tx_id,
VertexAccessor &from,
void UpdatesRpcClients::AddInEdge(tx::TransactionId tx_id, VertexAccessor &from,
storage::EdgeAddress edge_address,
VertexAccessor &to,
storage::EdgeType edge_type) {

View File

@ -27,20 +27,26 @@ class UpdatesRpcClients {
UpdateResult Update(int worker_id, const database::StateDelta &delta);
/// Creates a vertex on the given worker and returns it's id.
gid::Gid CreateVertex(
CreatedVertexInfo CreateVertex(
int worker_id, tx::TransactionId tx_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties);
&properties,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
/// Creates an edge on the given worker and returns it's address. If the `to`
/// vertex is on the same worker as `from`, then all remote CRUD will be
/// handled by a call to this function. Otherwise a separate call to
/// `AddInEdge` might be necessary. Throws all the exceptions that can
/// occur remotely as a result of updating a vertex.
storage::EdgeAddress CreateEdge(tx::TransactionId tx_id,
VertexAccessor &from, VertexAccessor &to,
storage::EdgeType edge_type);
CreatedEdgeInfo CreateEdge(tx::TransactionId tx_id, VertexAccessor &from,
VertexAccessor &to, storage::EdgeType edge_type,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
// TODO (buda): Another machine in the cluster is asked to create an edge.
// cypher_id should be generated in that process. It probably doesn't make
// sense to have optional cypher id here. Maybe for the recovery purposes.
/// Adds the edge with the given address to the `to` vertex as an incoming
/// edge. Only used when `to` is remote and not on the same worker as `from`.
@ -61,8 +67,8 @@ class UpdatesRpcClients {
gid::Gid vertex_from_id,
storage::VertexAddress vertex_to_addr);
void RemoveInEdge(tx::TransactionId tx_id, int worker_id,
gid::Gid vertex_id, storage::EdgeAddress edge_address);
void RemoveInEdge(tx::TransactionId tx_id, int worker_id, gid::Gid vertex_id,
storage::EdgeAddress edge_address);
/// Calls for all the workers (except the given one) to apply their updates
/// and returns the future results.

View File

@ -48,6 +48,7 @@ cpp<#
(lcp:define-struct create-result ()
((result "UpdateResult")
(cypher-id :int64_t :documentation "Only valid if creation was successful.")
(gid "gid::Gid" :documentation "Only valid if creation was successful."))
(:serialize :capnp))
@ -82,7 +83,25 @@ cpp<#
distributed::LoadCapnpTypedValue(reader.getValue(), &value);
return std::make_pair(prop, value);
});
cpp<#)))
cpp<#))
(cypher-id "std::experimental::optional<int64_t>"
:capnp-type "Utils.Optional(Utils.BoxInt64)"
:capnp-save
(lambda (builder member)
#>cpp
utils::SaveOptional<utils::capnp::BoxInt64, int64_t>(
${member}, &${builder}, [](auto *builder, const auto &value) {
builder->setValue(value);
});
cpp<#)
:capnp-load
(lambda (reader member)
#>cpp
cypher_id = utils::LoadOptional<utils::capnp::BoxInt64, int64_t>(
${reader}, [](const auto &reader) {
return reader.getValue();
});
cpp<#)))
(:serialize :capnp))
(lcp:define-rpc create-vertex
@ -93,7 +112,25 @@ cpp<#
((from "gid::Gid")
(to "storage::VertexAddress")
(edge-type "storage::EdgeType")
(tx-id "tx::TransactionId"))
(tx-id "tx::TransactionId")
(cypher-id "std::experimental::optional<int64_t>"
:capnp-type "Utils.Optional(Utils.BoxInt64)"
:capnp-save
(lambda (builder member)
#>cpp
utils::SaveOptional<utils::capnp::BoxInt64, int64_t>(
${member}, &${builder}, [](auto *builder, const auto &value) {
builder->setValue(value);
});
cpp<#)
:capnp-load
(lambda (reader member)
#>cpp
cypher_id = utils::LoadOptional<utils::capnp::BoxInt64, int64_t>(
${reader}, [](const auto &reader) {
return reader.getValue();
});
cpp<#)))
(:serialize :capnp))
(lcp:define-rpc create-edge
@ -143,4 +180,28 @@ cpp<#
(:request ((member "RemoveInEdgeData")))
(:response ((member "UpdateResult"))))
(lcp:define-struct created-info ()
((cypher-id "int64_t")
(gid "gid::Gid"))
(:public #>cpp
CreatedInfo(int64_t cypher_id, gid::Gid gid)
: cypher_id(cypher_id), gid(gid) {}
cpp<#))
(lcp:define-struct created-vertex-info ()
((cypher-id "int64_t")
(gid "gid::Gid"))
(:public #>cpp
CreatedVertexInfo(int64_t cypher_id, gid::Gid gid)
: cypher_id(cypher_id), gid(gid) {}
cpp<#))
(lcp:define-struct created-edge-info ()
((cypher-id "int64_t")
(edge-address "storage::EdgeAddress"))
(:public #>cpp
CreatedEdgeInfo(int64_t cypher_id, storage::EdgeAddress edge_address)
: cypher_id(cypher_id), edge_address(edge_address) {}
cpp<#))
(lcp:pop-namespace) ;; distributed

View File

@ -60,32 +60,34 @@ UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Emplace(
}
template <typename TRecordAccessor>
gid::Gid UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateVertex(
CreatedInfo UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateVertex(
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties) {
auto result = db_accessor_->InsertVertex();
const std::unordered_map<storage::Property, query::TypedValue> &properties,
std::experimental::optional<int64_t> cypher_id) {
auto result =
db_accessor_->InsertVertex(std::experimental::nullopt, cypher_id);
for (auto &label : labels) result.add_label(label);
for (auto &kv : properties) result.PropsSet(kv.first, kv.second);
std::lock_guard<utils::SpinLock> guard{lock_};
deltas_.emplace(result.gid(),
std::make_pair(result, std::vector<database::StateDelta>{}));
return result.gid();
return CreatedInfo(result.CypherId(), result.gid());
}
template <typename TRecordAccessor>
gid::Gid UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateEdge(
CreatedInfo UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateEdge(
gid::Gid from, storage::VertexAddress to, storage::EdgeType edge_type,
int worker_id) {
int worker_id, std::experimental::optional<int64_t> cypher_id) {
auto &db = db_accessor_->db();
auto from_addr = db.storage().LocalizedAddressIfPossible(
storage::VertexAddress(from, worker_id));
auto to_addr = db.storage().LocalizedAddressIfPossible(to);
auto edge = db_accessor_->InsertOnlyEdge(from_addr, to_addr, edge_type);
auto edge = db_accessor_->InsertOnlyEdge(
from_addr, to_addr, edge_type, std::experimental::nullopt, cypher_id);
std::lock_guard<utils::SpinLock> guard{lock_};
deltas_.emplace(edge.gid(),
std::make_pair(edge, std::vector<database::StateDelta>{}));
return edge.gid();
return CreatedInfo(edge.CypherId(), edge.gid());
}
template <typename TRecordAccessor>
@ -218,9 +220,11 @@ UpdatesRpcServer::UpdatesRpcServer(database::DistributedGraphDb *db,
auto *res_builder) {
CreateVertexReq req;
req.Load(req_reader);
gid::Gid gid = GetUpdates(vertex_updates_, req.member.tx_id)
.CreateVertex(req.member.labels, req.member.properties);
CreateVertexRes res(CreateResult{UpdateResult::DONE, gid});
auto result = GetUpdates(vertex_updates_, req.member.tx_id)
.CreateVertex(req.member.labels, req.member.properties,
req.member.cypher_id);
CreateVertexRes res(
CreateResult{UpdateResult::DONE, result.cypher_id, result.gid});
res.Save(res_builder);
});
@ -337,14 +341,17 @@ UpdatesRpcServer::TransactionUpdates<TAccessor> &UpdatesRpcServer::GetUpdates(
}
CreateResult UpdatesRpcServer::CreateEdge(const CreateEdgeReqData &req) {
auto gid = GetUpdates(edge_updates_, req.tx_id)
.CreateEdge(req.from, req.to, req.edge_type, db_->WorkerId());
auto ids = GetUpdates(edge_updates_, req.tx_id)
.CreateEdge(req.from, req.to, req.edge_type, db_->WorkerId(),
req.cypher_id);
// cypher_id doesn't have to be inserted because edge is stored
// somewhere else in the cluster. Here is only vertex update.
auto from_delta = database::StateDelta::AddOutEdge(
req.tx_id, req.from, req.to, {gid, db_->WorkerId()}, req.edge_type);
req.tx_id, req.from, req.to, {ids.gid, db_->WorkerId()}, req.edge_type);
auto result = GetUpdates(vertex_updates_, req.tx_id).Emplace(from_delta);
return {result, gid};
return {result, ids.cypher_id, ids.gid};
}
UpdateResult UpdatesRpcServer::RemoveEdge(const RemoveEdgeData &data) {

View File

@ -1,3 +1,5 @@
/// @file
#pragma once
#include <unordered_map>
@ -36,21 +38,25 @@ class UpdatesRpcServer {
tx::TransactionId tx_id)
: db_accessor_(db->Access(tx_id)) {}
/// Adds a delta and returns the result. Does not modify the state (data) of
/// the graph element the update is for, but calls the `update` method to
/// fail-fast on serialization and update-after-delete errors.
/// Adds a delta and returns the result. Does not modify the state (data)
/// of the graph element the update is for, but calls the `update` method
/// to fail-fast on serialization and update-after-delete errors.
UpdateResult Emplace(const database::StateDelta &delta);
/// Creates a new vertex and returns it's gid.
gid::Gid CreateVertex(
/// Creates a new vertex and returns it's cypher_id and gid.
CreatedInfo CreateVertex(
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties);
&properties,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
/// Creates a new edge and returns it's gid. Does not update vertices at the
/// end of the edge.
gid::Gid CreateEdge(gid::Gid from, storage::VertexAddress to,
storage::EdgeType edge_type, int worker_id);
/// Creates a new edge and returns it's cypher_id and gid. Does not update
/// vertices at the end of the edge.
CreatedInfo CreateEdge(gid::Gid from, storage::VertexAddress to,
storage::EdgeType edge_type, int worker_id,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
/// Applies all the deltas on the record.
UpdateResult Apply();
@ -74,8 +80,8 @@ class UpdatesRpcServer {
communication::rpc::Server *server);
/// Applies all existsing updates for the given transaction ID. If there are
/// no updates for that transaction, nothing happens. Clears the updates cache
/// after applying them, regardless of the result.
/// no updates for that transaction, nothing happens. Clears the updates
/// cache after applying them, regardless of the result.
UpdateResult Apply(tx::TransactionId tx_id);
/// Clears the cache of local transactions that are completed. The signature

View File

@ -68,7 +68,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
}
for (const auto &edge : dba.Edges(false)) {
encoder.WriteEdge(glue::ToBoltEdge(edge));
encoder.WriteInt(edge.cypher_id());
encoder.WriteInt(edge.CypherId());
edge_num++;
}
buffer.WriteValue(vertex_num);

View File

@ -17,7 +17,7 @@ class SnapshotEncoder : public communication::bolt::BaseEncoder<Buffer> {
glue::ToBoltVertex(vertex));
// Write cypher_id
this->WriteInt(vertex.cypher_id());
this->WriteInt(vertex.CypherId());
// Write in edges without properties
this->WriteUInt(vertex.in_degree());

View File

@ -224,9 +224,6 @@ class VersionList {
record->mark_expired(t);
}
/**
* TODO (buda): Try to move git_ to storage::Address.
*/
const gid::Gid gid_;
auto cypher_id() { return cypher_id_; }

View File

@ -616,10 +616,10 @@ TypedValue Id(TypedValue *args, int64_t nargs, Context *ctx) {
auto &arg = args[0];
switch (arg.type()) {
case TypedValue::Type::Vertex: {
return TypedValue(arg.ValueVertex().cypher_id());
return TypedValue(arg.ValueVertex().CypherId());
}
case TypedValue::Type::Edge: {
return TypedValue(arg.ValueEdge().cypher_id());
return TypedValue(arg.ValueEdge().CypherId());
}
default:
throw QueryRuntimeException("'id' argument must be a node or an edge.");

View File

@ -1191,8 +1191,9 @@ VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom,
properties.emplace(kv.first.second, std::move(value));
}
auto new_node = database::InsertVertexIntoRemote(
&dba, worker_id, node_atom->labels_, properties);
auto new_node =
database::InsertVertexIntoRemote(&dba, worker_id, node_atom->labels_,
properties, std::experimental::nullopt);
frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node;
return frame[context.symbol_table_.at(*node_atom->identifier_)].ValueVertex();
}

View File

@ -2010,8 +2010,7 @@ bool RemoveLabels::RemoveLabelsCursor::Pull(Frame &frame, Context &context) {
try {
for (auto label : self_.labels_) vertex.remove_label(label);
} catch (const RecordDeletedError &) {
throw QueryRuntimeException(
"Trying to remove labels from a deleted node.");
throw QueryRuntimeException("Trying to remove labels from a deleted node.");
}
return true;

View File

@ -22,32 +22,38 @@ void VertexMigrator::MigrateVertex(VertexAccessor &vertex, int destination) {
};
auto relocated_vertex = database::InsertVertexIntoRemote(
dba_, destination, vertex.labels(), get_props(vertex));
dba_, destination, vertex.labels(), get_props(vertex), vertex.CypherId());
vertex_migrated_to_[vertex.gid()] = relocated_vertex.address();
for (auto in_edge : vertex.in()) {
auto from = in_edge.from();
update_if_moved(from);
auto new_in_edge =
dba_->InsertEdge(from, relocated_vertex, in_edge.EdgeType());
for (auto prop : get_props(in_edge)) {
new_in_edge.PropsSet(prop.first, prop.second);
}
}
for (auto out_edge : vertex.out()) {
auto to = out_edge.to();
// Continue on self-loops since those edges have already been added
// while iterating over in edges
if (to == vertex) continue;
update_if_moved(to);
// Here cypher_id has to be passed to the other machine because this
// machine owns the edge.
auto new_out_edge =
dba_->InsertEdge(relocated_vertex, to, out_edge.EdgeType());
dba_->InsertEdge(relocated_vertex, to, out_edge.EdgeType(),
std::experimental::nullopt, out_edge.CypherId());
for (auto prop : get_props(out_edge)) {
new_out_edge.PropsSet(prop.first, prop.second);
}
}
for (auto in_edge : vertex.in()) {
auto from = in_edge.from();
// Continue on self-loops since those edges have already been added
// while iterating over out edges.
if (from == vertex) continue;
update_if_moved(from);
// Both gid and cypher_id should be without value because this machine
// doesn't own the edge.
auto new_in_edge =
dba_->InsertEdge(from, relocated_vertex, in_edge.EdgeType(),
std::experimental::nullopt, in_edge.CypherId());
for (auto prop : get_props(in_edge)) {
new_in_edge.PropsSet(prop.first, prop.second);
}
}
dba_->DetachRemoveVertex(vertex);
}

View File

@ -165,11 +165,18 @@ TRecord &RecordAccessor<TRecord>::update() const {
return *new_;
}
template <typename TRecord>
int64_t RecordAccessor<TRecord>::CypherId() const {
return impl_->CypherId(*this);
}
template <typename TRecord>
const TRecord &RecordAccessor<TRecord>::current() const {
// Edges have lazily initialize mutable, versioned data (properties).
if (std::is_same<TRecord, Edge>::value && current_ == nullptr)
RecordAccessor::Reconstruct();
if (std::is_same<TRecord, Edge>::value && current_ == nullptr) {
bool reconstructed = Reconstruct();
DCHECK(reconstructed) << "Unable to initialize record";
}
DCHECK(current_ != nullptr) << "RecordAccessor.current_ pointer is nullptr";
return *current_;
}

View File

@ -53,6 +53,7 @@ class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
/** Process a change delta, e.g. by writing WAL. */
virtual void ProcessDelta(const RecordAccessor<TRecord> &ra,
const database::StateDelta &delta) = 0;
virtual int64_t CypherId(const RecordAccessor<TRecord> &ra) = 0;
};
// this class is default copyable, movable and assignable
@ -176,12 +177,10 @@ class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
* owner is some other worker in a distributed system. */
bool is_local() const { return address_.is_local(); }
int64_t cypher_id() const {
if (address_.is_local())
return address_.local()->cypher_id();
else
throw utils::NotYetImplemented("Fetch remote cypher_id");
}
/**
* Returns Cypher Id of this record.
*/
int64_t CypherId() const;
protected:
/**

View File

@ -67,12 +67,12 @@ target_link_libraries(${test_prefix}distributed_coordination memgraph_lib kvstor
add_unit_test(distributed_data_exchange.cpp)
target_link_libraries(${test_prefix}distributed_data_exchange memgraph_lib kvstore_dummy_lib)
add_unit_test(distributed_dgp_vertex_migrator.cpp)
target_link_libraries(${test_prefix}distributed_dgp_vertex_migrator memgraph_lib kvstore_dummy_lib)
add_unit_test(distributed_durability.cpp)
target_link_libraries(${test_prefix}distributed_durability memgraph_lib kvstore_dummy_lib)
add_unit_test(distributed_dynamic_graph_partitioner.cpp)
target_link_libraries(${test_prefix}distributed_dynamic_graph_partitioner memgraph_lib kvstore_dummy_lib)
add_unit_test(distributed_gc.cpp)
target_link_libraries(${test_prefix}distributed_gc memgraph_lib kvstore_dummy_lib)
@ -94,9 +94,6 @@ target_link_libraries(${test_prefix}distributed_serialization memgraph_lib kvsto
add_unit_test(distributed_updates.cpp)
target_link_libraries(${test_prefix}distributed_updates memgraph_lib kvstore_dummy_lib)
# add_unit_test(distributed_vertex_migrator.cpp)
# target_link_libraries(${test_prefix}distributed_vertex_migrator memgraph_lib kvstore_dummy_lib)
add_unit_test(durability.cpp)
target_link_libraries(${test_prefix}durability memgraph_lib kvstore_dummy_lib)

View File

@ -50,8 +50,8 @@ class DistributedGraphDbTest : public ::testing::Test {
master_config.durability_directory = tmp_dir_;
// This is semantically wrong since this is not a cluster of size 1 but of
// size kWorkerCount+1, but it's hard to wait here for workers to recover
// and simultaneously assign the port to which the workers must connect
// TODO(dgleich): Fix sometime in the future - not mission critical
// and simultaneously assign the port to which the workers must connect.
// TODO (buda): Fix sometime in the future - not mission critical.
master_config.recovering_cluster_size = 1;
master_ = std::make_unique<database::Master>(modify_config(master_config));
@ -121,8 +121,8 @@ class DistributedGraphDbTest : public ::testing::Test {
auto dba = master().Access();
VertexAccessor from{from_addr, *dba};
VertexAccessor to{to_addr, *dba};
auto r_val =
dba->InsertEdge(from, to, dba->EdgeType(edge_type_name)).GlobalAddress();
auto r_val = dba->InsertEdge(from, to, dba->EdgeType(edge_type_name))
.GlobalAddress();
master().updates_server().Apply(dba->transaction_id());
worker(1).updates_server().Apply(dba->transaction_id());
worker(2).updates_server().Apply(dba->transaction_id());

View File

@ -0,0 +1,401 @@
#include "distributed_common.hpp"
#include <memory>
#include <thread>
#include <unordered_set>
#include "gtest/gtest.h"
#include "distributed/updates_rpc_clients.hpp"
#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
using namespace distributed;
using namespace database;
/// Check if the following data is migrated correctly accross the cluster:
/// * cypher_id
/// * labels
/// * edge_types
/// * properties
class DistributedVertexMigratorTest : public DistributedGraphDbTest {
private:
struct GraphSize {
int worker_id;
int vertex_no;
int edge_no;
};
public:
DistributedVertexMigratorTest() : DistributedGraphDbTest("vertex_migrator") {}
/**
* Prefill the cluster with vertices and edges so that the ids are not the
* same across the cluster.
*/
void FillOutCluster(const std::vector<int> graph_sizes) {
for (int i = 0; i < graph_sizes[0]; ++i) {
auto vaddr = InsertVertex(master());
InsertEdge(vaddr, vaddr, "edge");
}
for (int i = 0; i < graph_sizes[1]; ++i) {
auto vaddr = InsertVertex(worker(1));
InsertEdge(vaddr, vaddr, "edge");
}
for (int i = 0; i < graph_sizes[2]; ++i) {
auto vaddr = InsertVertex(worker(2));
InsertEdge(vaddr, vaddr, "edge");
}
}
/**
* Wait for all futures and commit the transaction.
*/
void MasterApplyUpdatesAndCommit(database::GraphDbAccessor *dba) {
{
auto apply_futures = master().updates_clients().UpdateApplyAll(
master().WorkerId(), dba->transaction().id_);
// Destructor waits on application
}
dba->Commit();
}
/**
* Migrate vertex with a given cypher_id from a given database to a given
* machine.
*/
void MigrateVertexAndCommit(database::GraphDbAccessor *from_dba,
int64_t cypher_id, int to_worker_id) {
auto vacc = FindVertex(from_dba, cypher_id);
VertexMigrator migrator(from_dba);
migrator.MigrateVertex(*vacc, to_worker_id);
MasterApplyUpdatesAndCommit(from_dba);
}
/**
* Assert number of vertices and edges on each worker.
*
* @param sizes An array of structs that hold information about graph size
* on each worker.
*/
void CheckGraphSizes(const std::vector<GraphSize> &graph_sizes) {
for (auto &graph_size : graph_sizes) {
if (graph_size.worker_id == 0) { // on master
ASSERT_EQ(VertexCount(master()), graph_size.vertex_no);
ASSERT_EQ(EdgeCount(master()), graph_size.edge_no);
} else { // on workers
ASSERT_EQ(VertexCount(worker(graph_size.worker_id)),
graph_size.vertex_no);
ASSERT_EQ(EdgeCount(worker(graph_size.worker_id)), graph_size.edge_no);
}
}
}
/**
* Collect all visible cypher_ids into an unordered_map for easier
* checking.
*/
auto CollectVertexCypherIds(database::GraphDbAccessor *dba) {
std::unordered_set<int64_t> cypher_ids;
for (auto &vertex : dba->Vertices(false)) {
cypher_ids.emplace(vertex.CypherId());
}
return cypher_ids;
}
/**
* Collect all visible cypher_ids into an unordered_map for easier
* checking.
*/
auto CollectEdgeCypherIds(database::GraphDbAccessor *dba) {
std::unordered_set<int64_t> cypher_ids;
for (auto &edge : dba->Edges(false)) {
cypher_ids.emplace(edge.CypherId());
}
return cypher_ids;
}
/**
* Check that container contains all containees.
*
* @tparam type of elements in the sets.
*/
template <typename T>
auto ContainsAll(const std::unordered_set<T> &container,
const std::unordered_set<T> &containees) {
// TODO (C++20): container.contains(item);
return std::all_of(containees.begin(), containees.end(),
[&container](T item) {
return container.find(item) != container.end();
});
}
/**
* Find vertex with a given cypher_id within a given database.
*/
std::experimental::optional<VertexAccessor> FindVertex(
database::GraphDbAccessor *dba, int64_t cypher_id) {
for (auto &vertex : dba->Vertices(false)) {
if (vertex.CypherId() == cypher_id)
return std::experimental::optional<VertexAccessor>(vertex);
}
return std::experimental::nullopt;
}
/**
* Find edge with a given cypher_id within a given database.
*/
std::experimental::optional<EdgeAccessor> FindEdge(
database::GraphDbAccessor *dba, int64_t cypher_id) {
for (auto &edge : dba->Edges(false)) {
if (edge.CypherId() == cypher_id)
return std::experimental::optional<EdgeAccessor>(edge);
}
return std::experimental::nullopt;
}
};
TEST_F(DistributedVertexMigratorTest, MigrationofLabelsEdgeTypesAndProperties) {
{
auto dba = master().Access();
auto va = dba->InsertVertex();
auto vb = dba->InsertVertex();
va.add_label(dba->Label("l"));
va.add_label(dba->Label("k"));
vb.add_label(dba->Label("l"));
vb.add_label(dba->Label("k"));
va.PropsSet(dba->Property("p"), 42);
vb.PropsSet(dba->Property("p"), 42);
auto ea = dba->InsertEdge(va, vb, dba->EdgeType("edge"));
ea.PropsSet(dba->Property("pe"), 43);
auto eb = dba->InsertEdge(vb, va, dba->EdgeType("edge"));
eb.PropsSet(dba->Property("pe"), 43);
dba->Commit();
}
{
auto dba = master().Access();
VertexMigrator migrator(dba.get());
for (auto &vertex : dba->Vertices(false)) {
migrator.MigrateVertex(vertex, worker(1).WorkerId());
}
MasterApplyUpdatesAndCommit(dba.get());
}
{
auto dba = worker(1).Access();
EXPECT_EQ(VertexCount(master()), 0);
ASSERT_EQ(VertexCount(worker(1)), 2);
for (auto vertex : dba->Vertices(false)) {
ASSERT_EQ(vertex.labels().size(), 2);
EXPECT_EQ(vertex.labels()[0], dba->Label("l"));
EXPECT_EQ(vertex.labels()[1], dba->Label("k"));
EXPECT_EQ(vertex.PropsAt(dba->Property("p")).Value<int64_t>(), 42);
}
ASSERT_EQ(EdgeCount(worker(1)), 2);
auto edge = *dba->Edges(false).begin();
EXPECT_EQ(edge.PropsAt(dba->Property("pe")).Value<int64_t>(), 43);
EXPECT_EQ(edge.EdgeType(), dba->EdgeType("edge"));
}
}
TEST_F(DistributedVertexMigratorTest, MigrationOfSelfLoopEdge) {
FillOutCluster({10, 0, 0});
// Create additional node on master and migrate that node to worker1.
auto vaddr = InsertVertex(master());
auto eaddr = InsertEdge(vaddr, vaddr, "edge");
auto dba = master().Access();
VertexAccessor vacc(vaddr, *dba);
EdgeAccessor eacc(eaddr, *dba);
auto initial_vcypher_id = vacc.CypherId();
auto initial_ecypher_id = eacc.CypherId();
{
auto dba = master().Access();
MigrateVertexAndCommit(dba.get(), initial_vcypher_id, worker(1).WorkerId());
}
// Check grpah size and cypher_ids.
CheckGraphSizes({{0, 10, 10}, {1, 1, 1}, {2, 0, 0}});
{
auto dba = worker(1).Access();
auto vaccessor = *dba->Vertices(false).begin();
auto eaccessor = *dba->Edges(false).begin();
ASSERT_EQ(vaccessor.CypherId(), initial_vcypher_id);
ASSERT_EQ(eaccessor.CypherId(), initial_ecypher_id);
ASSERT_TRUE(eaccessor.from_addr().is_local());
ASSERT_TRUE(eaccessor.to_addr().is_local());
}
}
TEST_F(DistributedVertexMigratorTest, MigrationOfSimpleVertex) {
FillOutCluster({1, 100, 200});
auto v1addr = InsertVertex(master());
auto v2addr = InsertVertex(master());
auto e1addr = InsertEdge(v1addr, v2addr, "edge");
auto dba = master().Access();
auto original_v1_cypher_id = VertexAccessor(v1addr, *dba).CypherId();
auto original_v2_cypher_id = VertexAccessor(v2addr, *dba).CypherId();
std::unordered_set<int64_t> original_v_cypher_ids = {original_v1_cypher_id,
original_v2_cypher_id};
auto original_e1_cypher_id = EdgeAccessor(e1addr, *dba).CypherId();
std::unordered_set<int64_t> original_e_cypher_ids = {original_e1_cypher_id};
CheckGraphSizes({{0, 3, 2}, {1, 100, 100}, {2, 200, 200}});
// Migrate v2 from master to worker1.
{
auto dba = master().Access();
MigrateVertexAndCommit(dba.get(), original_v2_cypher_id,
worker(1).WorkerId());
}
CheckGraphSizes({{0, 2, 2}, {1, 101, 100}, {2, 200, 200}});
{
auto dba = worker(1).Access();
auto v2acc = FindVertex(dba.get(), original_v2_cypher_id);
ASSERT_TRUE(v2acc);
}
// Migrate v1 from master to worker1.
{
auto dba = master().Access();
MigrateVertexAndCommit(dba.get(), original_v1_cypher_id,
worker(1).WorkerId());
}
CheckGraphSizes({{0, 1, 1}, {1, 102, 101}, {2, 200, 200}});
{
auto dba = worker(1).Access();
auto worker1_v_cypher_ids = CollectVertexCypherIds(dba.get());
auto worker1_e_cypher_ids = CollectEdgeCypherIds(dba.get());
ASSERT_TRUE(ContainsAll(worker1_v_cypher_ids, original_v_cypher_ids));
ASSERT_TRUE(ContainsAll(worker1_e_cypher_ids, original_e_cypher_ids));
}
// Migrate v1 from worker1 to worker2.
{
auto dba = worker(1).Access();
MigrateVertexAndCommit(dba.get(), original_v1_cypher_id,
worker(2).WorkerId());
}
CheckGraphSizes({{0, 1, 1}, {1, 101, 100}, {2, 201, 201}});
{
auto dba = worker(2).Access();
auto worker2_v_cypher_ids = CollectVertexCypherIds(dba.get());
auto worker2_e_cypher_ids = CollectEdgeCypherIds(dba.get());
ASSERT_TRUE(ContainsAll(worker2_v_cypher_ids, {original_v1_cypher_id}));
ASSERT_TRUE(ContainsAll(worker2_e_cypher_ids, {original_e1_cypher_id}));
}
// Migrate v2 from worker1 to master.
{
auto dba = worker(1).Access();
MigrateVertexAndCommit(dba.get(), original_v2_cypher_id,
master().WorkerId());
}
CheckGraphSizes({{0, 2, 1}, {1, 100, 100}, {2, 201, 201}});
{
auto master_dba = master().Access();
auto worker2_dba = worker(2).Access();
auto master_v_cypher_ids = CollectVertexCypherIds(master_dba.get());
auto worker2_v_cypher_ids = CollectVertexCypherIds(worker2_dba.get());
auto worker2_e_cypher_ids = CollectEdgeCypherIds(worker2_dba.get());
ASSERT_TRUE(ContainsAll(master_v_cypher_ids, {original_v2_cypher_id}));
ASSERT_TRUE(ContainsAll(worker2_v_cypher_ids, {original_v1_cypher_id}));
ASSERT_TRUE(ContainsAll(worker2_e_cypher_ids, {original_e1_cypher_id}));
}
// Migrate v2 from master wo worker2.
{
auto dba = master().Access();
MigrateVertexAndCommit(dba.get(), original_v2_cypher_id,
worker(2).WorkerId());
}
CheckGraphSizes({{0, 1, 1}, {1, 100, 100}, {2, 202, 201}});
{
auto dba = worker(2).Access();
auto worker2_v_cypher_ids = CollectVertexCypherIds(dba.get());
auto worker2_e_cypher_ids = CollectEdgeCypherIds(dba.get());
ASSERT_TRUE(ContainsAll(worker2_v_cypher_ids, original_v_cypher_ids));
ASSERT_TRUE(ContainsAll(worker2_e_cypher_ids, original_e_cypher_ids));
}
}
TEST_F(DistributedVertexMigratorTest, MigrationOfVertexWithMultipleEdges) {
FillOutCluster({1, 100, 200});
auto m_v1addr = InsertVertex(master());
auto m_v2addr = InsertVertex(master());
auto v3addr = InsertVertex(master());
auto dba = master().Access();
auto original_v3_cypher_id = VertexAccessor(v3addr, *dba).CypherId();
auto w1_v1addr = InsertVertex(worker(1));
auto w1_v2addr = InsertVertex(worker(1));
auto w2_v1addr = InsertVertex(worker(2));
auto w2_v2addr = InsertVertex(worker(2));
auto e1addr = InsertEdge(v3addr, m_v1addr, "edge");
auto e2addr = InsertEdge(v3addr, m_v2addr, "edge");
auto e3addr = InsertEdge(v3addr, w1_v1addr, "edge");
auto e4addr = InsertEdge(v3addr, w1_v2addr, "edge");
auto e5addr = InsertEdge(v3addr, w2_v1addr, "edge");
auto e6addr = InsertEdge(v3addr, w2_v2addr, "edge");
std::unordered_set<int64_t> original_e_cypher_ids = {
EdgeAccessor(e1addr, *dba).CypherId(),
EdgeAccessor(e2addr, *dba).CypherId(),
EdgeAccessor(e3addr, *dba).CypherId(),
EdgeAccessor(e4addr, *dba).CypherId(),
EdgeAccessor(e5addr, *dba).CypherId(),
EdgeAccessor(e6addr, *dba).CypherId()};
CheckGraphSizes({{0, 4, 7}, {1, 102, 100}, {2, 202, 200}});
// Migrate v3 from master to worker1.
{
auto dba = master().Access();
MigrateVertexAndCommit(dba.get(), original_v3_cypher_id,
worker(1).WorkerId());
}
CheckGraphSizes({{0, 3, 1}, {1, 103, 106}, {2, 202, 200}});
{
auto dba = worker(1).Access();
auto worker1_v_cypher_ids = CollectVertexCypherIds(dba.get());
auto worker1_e_cypher_ids = CollectEdgeCypherIds(dba.get());
ASSERT_TRUE(ContainsAll(worker1_v_cypher_ids, {original_v3_cypher_id}));
ASSERT_TRUE(ContainsAll(worker1_e_cypher_ids, original_e_cypher_ids));
}
// Migrate v3 from worker1 to worker2.
{
auto dba = worker(1).Access();
MigrateVertexAndCommit(dba.get(), original_v3_cypher_id,
worker(2).WorkerId());
}
CheckGraphSizes({{0, 3, 1}, {1, 102, 100}, {2, 203, 206}});
{
auto dba = worker(2).Access();
auto worker2_v_cypher_ids = CollectVertexCypherIds(dba.get());
auto worker2_e_cypher_ids = CollectEdgeCypherIds(dba.get());
ASSERT_TRUE(ContainsAll(worker2_v_cypher_ids, {original_v3_cypher_id}));
ASSERT_TRUE(ContainsAll(worker2_e_cypher_ids, original_e_cypher_ids));
}
// Migrate v3 from worker2 back to master.
{
auto dba = worker(2).Access();
MigrateVertexAndCommit(dba.get(), original_v3_cypher_id,
master().WorkerId());
}
CheckGraphSizes({{0, 4, 7}, {1, 102, 100}, {2, 202, 200}});
{
auto dba = master().Access();
auto master_v_cypher_ids = CollectVertexCypherIds(dba.get());
auto master_e_cypher_ids = CollectEdgeCypherIds(dba.get());
ASSERT_TRUE(ContainsAll(master_v_cypher_ids, {original_v3_cypher_id}));
ASSERT_TRUE(ContainsAll(master_e_cypher_ids, original_e_cypher_ids));
}
}

View File

@ -1,158 +0,0 @@
#include "distributed_common.hpp"
#include <memory>
#include <thread>
#include <unordered_set>
#include <vector>
#include "gtest/gtest.h"
#include "distributed/updates_rpc_clients.hpp"
#include "storage/dynamic_graph_partitioner/dgp.hpp"
using namespace distributed;
using namespace database;
DECLARE_int32(dgp_max_batch_size);
class DistributedDynamicGraphPartitionerTest : public DistributedGraphDbTest {
public:
DistributedDynamicGraphPartitionerTest()
: DistributedGraphDbTest("dynamic_graph_partitioner") {}
};
TEST_F(DistributedDynamicGraphPartitionerTest, CountLabels) {
auto va = InsertVertex(master());
auto vb = InsertVertex(worker(1));
auto vc = InsertVertex(worker(2));
for (int i = 0; i < 2; ++i) InsertEdge(va, va, "edge");
for (int i = 0; i < 3; ++i) InsertEdge(va, vb, "edge");
for (int i = 0; i < 4; ++i) InsertEdge(va, vc, "edge");
for (int i = 0; i < 5; ++i) InsertEdge(vb, va, "edge");
for (int i = 0; i < 6; ++i) InsertEdge(vc, va, "edge");
DynamicGraphPartitioner dgp(&master());
auto dba = master().Access();
VertexAccessor v(va, *dba);
auto count_labels = dgp.CountLabels(v);
// Self loops counted twice
EXPECT_EQ(count_labels[master().WorkerId()], 2 * 2);
EXPECT_EQ(count_labels[worker(1).WorkerId()], 3 + 5);
EXPECT_EQ(count_labels[worker(2).WorkerId()], 4 + 6);
}
TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMoveVertex) {
auto va = InsertVertex(master());
auto vb = InsertVertex(worker(1));
// Balance the number of nodes on workers a bit
InsertVertex(worker(2));
InsertVertex(worker(2));
for (int i = 0; i < 100; ++i) InsertEdge(va, vb, "edge");
DynamicGraphPartitioner dgp(&master());
auto dba = master().Access();
auto migrations = dgp.FindMigrations(*dba);
// Expect `va` to try to move to another worker, the one connected to it
ASSERT_EQ(migrations.size(), 1);
EXPECT_EQ(migrations[0].second, worker(1).WorkerId());
}
TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsNoChange) {
InsertVertex(master());
InsertVertex(worker(1));
InsertVertex(worker(2));
// Everything is balanced, there should be no movement
DynamicGraphPartitioner dgp(&master());
auto dba = master().Access();
auto migrations = dgp.FindMigrations(*dba);
EXPECT_EQ(migrations.size(), 0);
}
TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMultipleAndLimit) {
auto va = InsertVertex(master());
auto vb = InsertVertex(master());
auto vc = InsertVertex(worker(1));
// Balance the number of nodes on workers a bit
InsertVertex(worker(1));
InsertVertex(worker(2));
InsertVertex(worker(2));
for (int i = 0; i < 100; ++i) InsertEdge(va, vc, "edge");
for (int i = 0; i < 100; ++i) InsertEdge(vb, vc, "edge");
DynamicGraphPartitioner dgp(&master());
auto dba = master().Access();
{
auto migrations = dgp.FindMigrations(*dba);
// Expect vertices to try to move to another worker
ASSERT_EQ(migrations.size(), 2);
}
// See if flag affects number of returned results
{
FLAGS_dgp_max_batch_size = 1;
auto migrations = dgp.FindMigrations(*dba);
// Expect vertices to try to move to another worker
ASSERT_EQ(migrations.size(), 1);
}
}
TEST_F(DistributedDynamicGraphPartitionerTest, Run) {
// Emulate a bipartite graph with lots of connections on the left, and right
// side, and some connections between the halfs
std::vector<storage::VertexAddress> left;
for (int i = 0; i < 10; ++i) {
left.push_back(InsertVertex(master()));
}
std::vector<storage::VertexAddress> right;
for (int i = 0; i < 10; ++i) {
right.push_back(InsertVertex(master()));
}
// Force the nodes of both sides to stay on one worker by inserting a lot of
// edges in between them
for (int i = 0; i < 1000; ++i) {
InsertEdge(left[rand() % 10], left[rand() % 10], "edge");
InsertEdge(right[rand() % 10], right[rand() % 10], "edge");
}
// Insert edges between left and right side
for (int i = 0; i < 50; ++i)
InsertEdge(left[rand() % 10], right[rand() % 10], "edge");
// Balance it out so that the vertices count on workers don't influence the
// partitioning too much
for (int i = 0; i < 10; ++i) InsertVertex(worker(2));
DynamicGraphPartitioner dgp(&master());
// Transfer one by one to actually converge
FLAGS_dgp_max_batch_size = 1;
// Try a bit more transfers to see if we reached a steady state
for (int i = 0; i < 15; ++i) {
dgp.Run();
}
EXPECT_EQ(VertexCount(master()), 10);
EXPECT_EQ(VertexCount(worker(1)), 10);
auto CountRemotes = [](GraphDbAccessor &dba) {
int64_t cnt = 0;
for (auto vertex : dba.Vertices(false)) {
for (auto edge : vertex.in())
if (edge.from_addr().is_remote()) ++cnt;
for (auto edge : vertex.out())
if (edge.to_addr().is_remote()) ++cnt;
}
return cnt;
};
auto dba_m = master().Access();
auto dba_w1 = worker(1).Access();
EXPECT_EQ(CountRemotes(*dba_m), 50);
EXPECT_EQ(CountRemotes(*dba_w1), 50);
}

View File

@ -77,7 +77,8 @@ TEST_F(DistributedGraphDbSimpleUpdatesTest, CreateVertex) {
gid::Gid gid;
{
auto dba = worker(1).Access();
auto v = database::InsertVertexIntoRemote(dba.get(), 2, {}, {});
auto v = database::InsertVertexIntoRemote(dba.get(), 2, {}, {},
std::experimental::nullopt);
gid = v.gid();
dba->Commit();
}
@ -93,7 +94,8 @@ TEST_F(DistributedGraphDbSimpleUpdatesTest, CreateVertexWithUpdate) {
storage::Property prop;
{
auto dba = worker(1).Access();
auto v = database::InsertVertexIntoRemote(dba.get(), 2, {}, {});
auto v = database::InsertVertexIntoRemote(dba.get(), 2, {}, {},
std::experimental::nullopt);
gid = v.gid();
prop = dba->Property("prop");
v.PropsSet(prop, 42);
@ -118,8 +120,8 @@ TEST_F(DistributedGraphDbSimpleUpdatesTest, CreateVertexWithData) {
l1 = dba->Label("l1");
l2 = dba->Label("l2");
prop = dba->Property("prop");
auto v =
database::InsertVertexIntoRemote(dba.get(), 2, {l1, l2}, {{prop, 42}});
auto v = database::InsertVertexIntoRemote(
dba.get(), 2, {l1, l2}, {{prop, 42}}, std::experimental::nullopt);
gid = v.gid();
// Check local visibility before commit.

View File

@ -1,166 +0,0 @@
#include "distributed_common.hpp"
#include <memory>
#include <thread>
#include <unordered_set>
#include "gtest/gtest.h"
#include "distributed/updates_rpc_clients.hpp"
#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
using namespace distributed;
using namespace database;
class DistributedVertexMigratorTest : public DistributedGraphDbTest {
public:
DistributedVertexMigratorTest() : DistributedGraphDbTest("vertex_migrator") {}
};
// Check if the auto-generated gid property is unchanged after migration
TEST_F(DistributedVertexMigratorTest, VertexEdgeGidSaved) {
// Fill master so that the ids are not the same on master and worker 1
for (int i = 0; i < 10; ++i) {
auto va = InsertVertex(master());
InsertEdge(va, va, "edge");
}
auto va = InsertVertex(master());
auto ea = InsertEdge(va, va, "edge");
database::GraphDbAccessor dba(master());
VertexAccessor vacc(va, dba);
EdgeAccessor eacc(ea, dba);
auto old_vgid_id = vacc.cypher_id();
auto old_egid_id = eacc.cypher_id();
{
database::GraphDbAccessor dba(master());
VertexAccessor accessor(va, dba);
VertexMigrator migrator(&dba);
migrator.MigrateVertex(accessor, worker(1).WorkerId());
{
auto apply_futures = master().updates_clients().UpdateApplyAll(
master().WorkerId(), dba.transaction().id_);
// Destructor waits on application
}
dba.Commit();
}
ASSERT_EQ(VertexCount(worker(1)), 1);
{
database::GraphDbAccessor dba(worker(1));
auto vaccessor = *dba.Vertices(false).begin();
auto eaccessor = *dba.Edges(false).begin();
EXPECT_EQ(vaccessor.cypher_id(), old_vgid_id);
EXPECT_EQ(eaccessor.cypher_id(), old_egid_id);
}
}
// Checks if two connected nodes from master will be transfered to worker 1 and
// if edge from vertex on the worker 2 will now point to worker 1 after transfer
TEST_F(DistributedVertexMigratorTest, SomeTransfer) {
auto va = InsertVertex(master());
auto vb = InsertVertex(master());
auto vc = InsertVertex(worker(2));
InsertEdge(va, vb, "edge");
InsertEdge(vc, va, "edge");
{
database::GraphDbAccessor dba(master());
VertexMigrator migrator(&dba);
for (auto &vertex : dba.Vertices(false)) {
migrator.MigrateVertex(vertex, worker(1).WorkerId());
}
{
auto apply_futures = master().updates_clients().UpdateApplyAll(
master().WorkerId(), dba.transaction().id_);
// Destructor waits on application
}
dba.Commit();
}
EXPECT_EQ(VertexCount(master()), 0);
EXPECT_EQ(EdgeCount(master()), 0);
EXPECT_EQ(VertexCount(worker(1)), 2);
EXPECT_EQ(EdgeCount(worker(1)), 1);
EXPECT_EQ(VertexCount(worker(2)), 1);
ASSERT_EQ(EdgeCount(worker(2)), 1);
{
database::GraphDbAccessor dba(worker(2));
auto edge = *dba.Edges(false).begin();
// Updated remote edge on another worker
EXPECT_EQ(edge.to_addr().worker_id(), worker(1).WorkerId());
}
}
// Check if cycle edge is transfered only once since it's contained in both in
// and out edges of a vertex and if not handled correctly could cause problems
TEST_F(DistributedVertexMigratorTest, EdgeCycle) {
auto va = InsertVertex(master());
InsertEdge(va, va, "edge");
{
database::GraphDbAccessor dba(master());
VertexMigrator migrator(&dba);
for (auto &vertex : dba.Vertices(false)) {
migrator.MigrateVertex(vertex, worker(1).WorkerId());
}
{
auto apply_futures = master().updates_clients().UpdateApplyAll(
master().WorkerId(), dba.transaction().id_);
// Destructor waits on application
}
dba.Commit();
}
EXPECT_EQ(VertexCount(master()), 0);
EXPECT_EQ(EdgeCount(master()), 0);
EXPECT_EQ(VertexCount(worker(1)), 1);
EXPECT_EQ(EdgeCount(worker(1)), 1);
}
TEST_F(DistributedVertexMigratorTest, TransferLabelsAndProperties) {
{
database::GraphDbAccessor dba(master());
auto va = dba.InsertVertex();
auto vb = dba.InsertVertex();
va.add_label(dba.Label("l"));
vb.add_label(dba.Label("l"));
va.PropsSet(dba.Property("p"), 42);
vb.PropsSet(dba.Property("p"), 42);
auto ea = dba.InsertEdge(va, vb, dba.EdgeType("edge"));
ea.PropsSet(dba.Property("pe"), 43);
auto eb = dba.InsertEdge(vb, va, dba.EdgeType("edge"));
eb.PropsSet(dba.Property("pe"), 43);
dba.Commit();
}
{
database::GraphDbAccessor dba(master());
VertexMigrator migrator(&dba);
for (auto &vertex : dba.Vertices(false)) {
migrator.MigrateVertex(vertex, worker(1).WorkerId());
}
{
auto apply_futures = master().updates_clients().UpdateApplyAll(
master().WorkerId(), dba.transaction().id_);
// Destructor waits on application
}
dba.Commit();
}
{
database::GraphDbAccessor dba(worker(1));
EXPECT_EQ(VertexCount(master()), 0);
ASSERT_EQ(VertexCount(worker(1)), 2);
for (auto vertex : dba.Vertices(false)) {
ASSERT_EQ(vertex.labels().size(), 1);
EXPECT_EQ(vertex.labels()[0], dba.Label("l"));
EXPECT_EQ(vertex.PropsAt(dba.Property("p")).Value<int64_t>(), 42);
}
ASSERT_EQ(EdgeCount(worker(1)), 2);
auto edge = *dba.Edges(false).begin();
EXPECT_EQ(edge.PropsAt(dba.Property("pe")).Value<int64_t>(), 43);
EXPECT_EQ(edge.EdgeType(), dba.EdgeType("edge"));
}
}

View File

@ -1148,8 +1148,8 @@ class QueryPlanExpandBfs
vertex.PropsSet(prop.second, id);
v.push_back(vertex.GlobalAddress());
} else {
auto vertex = database::InsertVertexIntoRemote(&dba, worker, {},
{{prop.second, id}});
auto vertex = database::InsertVertexIntoRemote(
&dba, worker, {}, {{prop.second, id}}, std::experimental::nullopt);
v.push_back(vertex.GlobalAddress());
}
}

View File

@ -19,7 +19,7 @@ TEST(StateDelta, CreateVertex) {
auto dba = db.Access();
auto vertex = dba->FindVertexOptional(gid0, false);
EXPECT_TRUE(vertex);
EXPECT_EQ(vertex->cypher_id(), 0);
EXPECT_EQ(vertex->CypherId(), 0);
}
}
@ -137,8 +137,8 @@ TEST(StateDelta, RemoveLabel) {
}
{
auto dba = db.Access();
auto delta = database::StateDelta::RemoveLabel(dba->transaction_id(), gid0,
dba->Label("label"), "label");
auto delta = database::StateDelta::RemoveLabel(
dba->transaction_id(), gid0, dba->Label("label"), "label");
delta.Apply(*dba);
dba->Commit();
}

View File

@ -6,7 +6,7 @@
#include "stats/stats_rpc_messages.hpp"
#include "utils/string.hpp"
// TODO (buda): move this logic to a unit test
// TODO (buda): Move this logic to a unit test.
bool parse_input(const std::string &s, std::string &metric_path,
std::vector<std::pair<std::string, std::string>> &tags,