Implement graph element rpc
Summary: - End to end distributed GraphDb testing - Refactors as necessary - Basic RemoteCache for storing remote data - RemoteDataRpc As we are on a tight schedule, please let's focus on the essentials: functionality and proper testing. Reviewers: dgleich, teon.banek, buda Reviewed By: dgleich Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1121
This commit is contained in:
parent
252018ab22
commit
e1e4a70714
src
communication/rpc
database
distributed
coordination.hppcoordination_master.cppcoordination_master.hppcoordination_worker.hppremote_cache.hppremote_data_rpc_clients.hppremote_data_rpc_messages.hppremote_data_rpc_server.hpp
storage
transactions
utils
tests/unit
@ -3,6 +3,7 @@
|
||||
#include "boost/serialization/export.hpp"
|
||||
|
||||
#include "distributed/coordination_rpc_messages.hpp"
|
||||
#include "distributed/remote_data_rpc_messages.hpp"
|
||||
#include "storage/concurrent_id_mapper_rpc_messages.hpp"
|
||||
#include "transactions/engine_rpc_messages.hpp"
|
||||
|
||||
@ -27,9 +28,17 @@ ID_VALUE_EXPORT_BOOST_TYPE(Property)
|
||||
|
||||
#undef ID_VALUE_EXPORT_BOOST_TYPE
|
||||
|
||||
// Distributed coordination.
|
||||
BOOST_CLASS_EXPORT(distributed::RegisterWorkerReq);
|
||||
BOOST_CLASS_EXPORT(distributed::RegisterWorkerRes);
|
||||
BOOST_CLASS_EXPORT(distributed::GetEndpointReq);
|
||||
BOOST_CLASS_EXPORT(distributed::GetEndpointRes);
|
||||
BOOST_CLASS_EXPORT(distributed::StopWorkerReq);
|
||||
BOOST_CLASS_EXPORT(distributed::StopWorkerRes);
|
||||
|
||||
// Distributed data exchange.
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteEdgeReq);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteEdgeRes);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteVertexReq);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteVertexRes);
|
||||
BOOST_CLASS_EXPORT(distributed::TxGidPair);
|
||||
|
@ -1,7 +1,11 @@
|
||||
#include "database/graph_db.hpp"
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "communication/messaging/distributed.hpp"
|
||||
#include "database/graph_db.hpp"
|
||||
#include "distributed/coordination_master.hpp"
|
||||
#include "distributed/coordination_worker.hpp"
|
||||
#include "distributed/remote_data_rpc_clients.hpp"
|
||||
#include "distributed/remote_data_rpc_server.hpp"
|
||||
#include "durability/paths.hpp"
|
||||
#include "durability/recovery.hpp"
|
||||
#include "durability/snapshooter.hpp"
|
||||
@ -64,36 +68,61 @@ struct TypemapPack {
|
||||
class SingleNode : public PrivateBase {
|
||||
public:
|
||||
explicit SingleNode(const Config &config) : PrivateBase(config) {}
|
||||
GraphDb::Type type() const override { return GraphDb::Type::SINGLE_NODE; }
|
||||
IMPL_GETTERS
|
||||
|
||||
private:
|
||||
tx::SingleNodeEngine tx_engine_{&wal_};
|
||||
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
|
||||
TypemapPack<SingleNodeConcurrentIdMapper> typemap_pack_;
|
||||
database::SingleNodeCounters counters_;
|
||||
distributed::RemoteDataRpcServer &remote_data_server() override {
|
||||
LOG(FATAL) << "Remote data server not available in single-node.";
|
||||
}
|
||||
distributed::RemoteDataRpcClients &remote_data_clients() override {
|
||||
LOG(FATAL) << "Remote data clients not available in single-node.";
|
||||
}
|
||||
};
|
||||
|
||||
#define IMPL_DISTRIBUTED_GETTERS \
|
||||
distributed::RemoteDataRpcServer &remote_data_server() override { \
|
||||
return remote_data_server_; \
|
||||
} \
|
||||
distributed::RemoteDataRpcClients &remote_data_clients() override { \
|
||||
return remote_data_clients_; \
|
||||
}
|
||||
|
||||
class Master : public PrivateBase {
|
||||
public:
|
||||
explicit Master(const Config &config) : PrivateBase(config) {}
|
||||
GraphDb::Type type() const override {
|
||||
return GraphDb::Type::DISTRIBUTED_MASTER;
|
||||
}
|
||||
IMPL_GETTERS
|
||||
IMPL_DISTRIBUTED_GETTERS
|
||||
|
||||
private:
|
||||
communication::messaging::System system_{config_.master_endpoint};
|
||||
tx::MasterEngine tx_engine_{system_, &wal_};
|
||||
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
|
||||
distributed::MasterCoordination coordination{system_};
|
||||
distributed::MasterCoordination coordination_{system_};
|
||||
TypemapPack<MasterConcurrentIdMapper> typemap_pack_{system_};
|
||||
database::MasterCounters counters_{system_};
|
||||
distributed::RemoteDataRpcServer remote_data_server_{*this, system_};
|
||||
distributed::RemoteDataRpcClients remote_data_clients_{system_,
|
||||
coordination_};
|
||||
};
|
||||
|
||||
class Worker : public PrivateBase {
|
||||
public:
|
||||
explicit Worker(const Config &config) : PrivateBase(config) {}
|
||||
IMPL_GETTERS
|
||||
void WaitForShutdown() { coordination_.WaitForShutdown(); }
|
||||
explicit Worker(const Config &config) : PrivateBase(config) {
|
||||
coordination_.RegisterWorker(config.worker_id);
|
||||
}
|
||||
|
||||
GraphDb::Type type() const override {
|
||||
return GraphDb::Type::DISTRIBUTED_WORKER;
|
||||
}
|
||||
IMPL_GETTERS
|
||||
IMPL_DISTRIBUTED_GETTERS
|
||||
|
||||
private:
|
||||
communication::messaging::System system_{config_.worker_endpoint};
|
||||
distributed::WorkerCoordination coordination_{system_,
|
||||
config_.master_endpoint};
|
||||
@ -102,6 +131,9 @@ class Worker : public PrivateBase {
|
||||
TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{system_,
|
||||
config_.master_endpoint};
|
||||
database::WorkerCounters counters_{system_, config_.master_endpoint};
|
||||
distributed::RemoteDataRpcServer remote_data_server_{*this, system_};
|
||||
distributed::RemoteDataRpcClients remote_data_clients_{system_,
|
||||
coordination_};
|
||||
};
|
||||
|
||||
#undef IMPL_GETTERS
|
||||
@ -127,6 +159,7 @@ PublicBase::~PublicBase() {
|
||||
if (impl_->config_.snapshot_on_exit) MakeSnapshot();
|
||||
}
|
||||
|
||||
GraphDb::Type PublicBase::type() const { return impl_->type(); }
|
||||
Storage &PublicBase::storage() { return impl_->storage(); }
|
||||
durability::WriteAheadLog &PublicBase::wal() { return impl_->wal(); }
|
||||
tx::Engine &PublicBase::tx_engine() { return impl_->tx_engine(); }
|
||||
@ -142,6 +175,12 @@ ConcurrentIdMapper<Property> &PublicBase::property_mapper() {
|
||||
database::Counters &PublicBase::counters() { return impl_->counters(); }
|
||||
void PublicBase::CollectGarbage() { impl_->CollectGarbage(); }
|
||||
int PublicBase::WorkerId() const { return impl_->WorkerId(); }
|
||||
distributed::RemoteDataRpcServer &PublicBase::remote_data_server() {
|
||||
return impl_->remote_data_server();
|
||||
}
|
||||
distributed::RemoteDataRpcClients &PublicBase::remote_data_clients() {
|
||||
return impl_->remote_data_clients();
|
||||
}
|
||||
|
||||
void PublicBase::MakeSnapshot() {
|
||||
const bool status = durability::MakeSnapshot(
|
||||
@ -187,10 +226,28 @@ SingleNode::SingleNode(Config config)
|
||||
Master::Master(Config config)
|
||||
: MasterBase(std::make_unique<impl::Master>(config)) {}
|
||||
|
||||
io::network::Endpoint Master::endpoint() const {
|
||||
return dynamic_cast<impl::Master *>(impl_.get())->system_.endpoint();
|
||||
}
|
||||
|
||||
io::network::Endpoint Master::GetEndpoint(int worker_id) {
|
||||
return dynamic_cast<impl::Master *>(impl_.get())
|
||||
->coordination_.GetEndpoint(worker_id);
|
||||
}
|
||||
|
||||
Worker::Worker(Config config)
|
||||
: PublicBase(std::make_unique<impl::Worker>(config)) {}
|
||||
|
||||
io::network::Endpoint Worker::endpoint() const {
|
||||
return dynamic_cast<impl::Worker *>(impl_.get())->system_.endpoint();
|
||||
}
|
||||
|
||||
io::network::Endpoint Worker::GetEndpoint(int worker_id) {
|
||||
return dynamic_cast<impl::Worker *>(impl_.get())
|
||||
->coordination_.GetEndpoint(worker_id);
|
||||
}
|
||||
|
||||
void Worker::WaitForShutdown() {
|
||||
dynamic_cast<impl::Worker *>(impl_.get())->WaitForShutdown();
|
||||
dynamic_cast<impl::Worker *>(impl_.get())->coordination_.WaitForShutdown();
|
||||
}
|
||||
} // namespace database
|
||||
|
@ -3,9 +3,6 @@
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
|
||||
#include "gflags/gflags.h"
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "database/counters.hpp"
|
||||
#include "database/storage.hpp"
|
||||
#include "database/storage_gc.hpp"
|
||||
@ -16,6 +13,11 @@
|
||||
#include "transactions/engine.hpp"
|
||||
#include "utils/scheduler.hpp"
|
||||
|
||||
namespace distributed {
|
||||
class RemoteDataRpcServer;
|
||||
class RemoteDataRpcClients;
|
||||
}
|
||||
|
||||
namespace database {
|
||||
|
||||
/// Database configuration. Initialized from flags, but modifiable.
|
||||
@ -64,9 +66,16 @@ struct Config {
|
||||
*/
|
||||
class GraphDb {
|
||||
public:
|
||||
enum class Type {
|
||||
SINGLE_NODE,
|
||||
DISTRIBUTED_MASTER,
|
||||
DISTRIBUTED_WORKER
|
||||
};
|
||||
|
||||
GraphDb() {}
|
||||
virtual ~GraphDb() {}
|
||||
|
||||
virtual Type type() const = 0;
|
||||
virtual Storage &storage() = 0;
|
||||
virtual durability::WriteAheadLog &wal() = 0;
|
||||
virtual tx::Engine &tx_engine() = 0;
|
||||
@ -78,6 +87,10 @@ class GraphDb {
|
||||
virtual void CollectGarbage() = 0;
|
||||
virtual int WorkerId() const = 0;
|
||||
|
||||
// Supported only in distributed master and worker, not in single-node.
|
||||
virtual distributed::RemoteDataRpcServer &remote_data_server() = 0;
|
||||
virtual distributed::RemoteDataRpcClients &remote_data_clients() = 0;
|
||||
|
||||
GraphDb(const GraphDb &) = delete;
|
||||
GraphDb(GraphDb &&) = delete;
|
||||
GraphDb &operator=(const GraphDb &) = delete;
|
||||
@ -94,6 +107,7 @@ class PrivateBase;
|
||||
// initialization and cleanup.
|
||||
class PublicBase : public GraphDb {
|
||||
public:
|
||||
Type type() const override;
|
||||
Storage &storage() override;
|
||||
durability::WriteAheadLog &wal() override;
|
||||
tx::Engine &tx_engine() override;
|
||||
@ -103,6 +117,8 @@ class PublicBase : public GraphDb {
|
||||
database::Counters &counters() override;
|
||||
void CollectGarbage() override;
|
||||
int WorkerId() const override;
|
||||
distributed::RemoteDataRpcServer &remote_data_server() override;
|
||||
distributed::RemoteDataRpcClients &remote_data_clients() override;
|
||||
|
||||
protected:
|
||||
explicit PublicBase(std::unique_ptr<PrivateBase> impl);
|
||||
@ -138,11 +154,21 @@ class SingleNode : public MasterBase {
|
||||
class Master : public MasterBase {
|
||||
public:
|
||||
explicit Master(Config config = Config());
|
||||
/** Gets this master's endpoint. */
|
||||
io::network::Endpoint endpoint() const;
|
||||
/** Gets the endpoint of the worker with the given id. */
|
||||
// TODO make const once Coordination::GetEndpoint is const.
|
||||
io::network::Endpoint GetEndpoint(int worker_id);
|
||||
};
|
||||
|
||||
class Worker : public impl::PublicBase {
|
||||
public:
|
||||
explicit Worker(Config config = Config());
|
||||
/** Gets this worker's endpoint. */
|
||||
io::network::Endpoint endpoint() const;
|
||||
/** Gets the endpoint of the worker with the given id. */
|
||||
// TODO make const once Coordination::GetEndpoint is const.
|
||||
io::network::Endpoint GetEndpoint(int worker_id);
|
||||
void WaitForShutdown();
|
||||
};
|
||||
} // namespace database
|
||||
|
@ -12,13 +12,27 @@
|
||||
namespace database {
|
||||
|
||||
GraphDbAccessor::GraphDbAccessor(GraphDb &db)
|
||||
: db_(db), transaction_(*SingleNodeEngine().Begin()) {}
|
||||
: db_(db),
|
||||
transaction_(*SingleNodeEngine().Begin()),
|
||||
transaction_starter_{true} {
|
||||
if (db_.type() != GraphDb::Type::SINGLE_NODE) {
|
||||
remote_vertices_.emplace(db_.remote_data_clients());
|
||||
remote_edges_.emplace(db_.remote_data_clients());
|
||||
}
|
||||
}
|
||||
|
||||
GraphDbAccessor::GraphDbAccessor(GraphDb &db, tx::transaction_id_t tx_id)
|
||||
: db_(db), transaction_(*db.tx_engine().RunningTransaction(tx_id)) {}
|
||||
: db_(db),
|
||||
transaction_(*db.tx_engine().RunningTransaction(tx_id)),
|
||||
transaction_starter_{false} {
|
||||
if (db_.type() != GraphDb::Type::SINGLE_NODE) {
|
||||
remote_vertices_.emplace(db_.remote_data_clients());
|
||||
remote_edges_.emplace(db_.remote_data_clients());
|
||||
}
|
||||
}
|
||||
|
||||
GraphDbAccessor::~GraphDbAccessor() {
|
||||
if (!commited_ && !aborted_) {
|
||||
if (transaction_starter_ && !commited_ && !aborted_) {
|
||||
this->Abort();
|
||||
}
|
||||
}
|
||||
@ -78,6 +92,13 @@ std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertex(
|
||||
return record_accessor;
|
||||
}
|
||||
|
||||
VertexAccessor GraphDbAccessor::FindVertexChecked(gid::Gid gid,
|
||||
bool current_state) {
|
||||
auto found = FindVertex(gid, current_state);
|
||||
CHECK(found) << "Unable to find vertex for id: " << gid;
|
||||
return *found;
|
||||
}
|
||||
|
||||
std::experimental::optional<EdgeAccessor> GraphDbAccessor::FindEdge(
|
||||
gid::Gid gid, bool current_state) {
|
||||
auto collection_accessor = db_.storage().edges_.access();
|
||||
@ -89,6 +110,13 @@ std::experimental::optional<EdgeAccessor> GraphDbAccessor::FindEdge(
|
||||
return record_accessor;
|
||||
}
|
||||
|
||||
EdgeAccessor GraphDbAccessor::FindEdgeChecked(gid::Gid gid,
|
||||
bool current_state) {
|
||||
auto found = FindEdge(gid, current_state);
|
||||
CHECK(found) << "Unable to find edge for id: " << gid;
|
||||
return *found;
|
||||
}
|
||||
|
||||
void GraphDbAccessor::BuildIndex(storage::Label label,
|
||||
storage::Property property) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
@ -433,16 +461,16 @@ std::vector<std::string> GraphDbAccessor::IndexInfo() const {
|
||||
}
|
||||
return info;
|
||||
}
|
||||
auto &GraphDbAccessor::remote_vertices() { return remote_vertices_; }
|
||||
auto &GraphDbAccessor::remote_edges() { return remote_edges_; }
|
||||
auto &GraphDbAccessor::remote_vertices() { return *remote_vertices_; }
|
||||
auto &GraphDbAccessor::remote_edges() { return *remote_edges_; }
|
||||
|
||||
template <>
|
||||
GraphDbAccessor::RemoteCache<Vertex> &GraphDbAccessor::remote_elements() {
|
||||
distributed::RemoteCache<Vertex> &GraphDbAccessor::remote_elements() {
|
||||
return remote_vertices();
|
||||
}
|
||||
|
||||
template <>
|
||||
GraphDbAccessor::RemoteCache<Edge> &GraphDbAccessor::remote_elements() {
|
||||
distributed::RemoteCache<Edge> &GraphDbAccessor::remote_elements() {
|
||||
return remote_edges();
|
||||
}
|
||||
} // namespace database
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "database/graph_db.hpp"
|
||||
#include "distributed/remote_cache.hpp"
|
||||
#include "storage/edge_accessor.hpp"
|
||||
#include "storage/types.hpp"
|
||||
#include "storage/vertex_accessor.hpp"
|
||||
@ -38,60 +39,6 @@ class GraphDbAccessor {
|
||||
friend class ::VertexAccessor;
|
||||
friend class ::EdgeAccessor;
|
||||
|
||||
/**
|
||||
* Used for caching Vertices and Edges that are stored on another worker in a
|
||||
* distributed system. Maps global IDs to (old, new) Vertex/Edge pointer
|
||||
* pairs. It is possible that either "old" or "new" are nullptrs, but at
|
||||
* least one must be not-null. The RemoteCache is the owner of TRecord
|
||||
* objects it points to.
|
||||
*
|
||||
* @tparam TRecord - Edge or Vertex
|
||||
*/
|
||||
template <typename TRecord>
|
||||
class RemoteCache {
|
||||
public:
|
||||
~RemoteCache() {
|
||||
for (const auto &pair : cache_) {
|
||||
delete pair.second.first;
|
||||
delete pair.second.second;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the "new" Vertex/Edge for the given gid.
|
||||
*
|
||||
* @param gid - global ID.
|
||||
* @param init_if_necessary - If "new" is not initialized and this flag is
|
||||
* set, then "new" is initialized with a copy of "old" before returning.
|
||||
*/
|
||||
TRecord *FindNew(gid::Gid gid, bool init_if_necessary) {
|
||||
auto found = cache_.find(gid);
|
||||
DCHECK(found != cache_.end()) << "Uninitialized remote Vertex/Edge";
|
||||
auto &pair = found->second;
|
||||
if (!pair.second && init_if_necessary) {
|
||||
pair.second = pair.first->CloneData();
|
||||
}
|
||||
return pair.second;
|
||||
}
|
||||
|
||||
/**
|
||||
* For the Vertex/Edge with the given global ID, looks for the data visible
|
||||
* from the given transaction's ID and command ID, and caches it. Sets the
|
||||
* given pointers to point to the fetched data. Analogue to
|
||||
* mvcc::VersionList::find_set_old_new.
|
||||
*/
|
||||
void FindSetOldNew(const tx::Transaction &, gid::Gid, TRecord *&,
|
||||
TRecord *&) {
|
||||
LOG(ERROR) << "Remote data storage not yet implemented";
|
||||
// TODO fetch data for (gid, t.id_, t.cmd_id()) from remote worker.
|
||||
// Set that data in the cache.
|
||||
// Set the pointers to the new data.
|
||||
}
|
||||
|
||||
private:
|
||||
std::unordered_map<gid::Gid, std::pair<TRecord *, TRecord *>> cache_;
|
||||
};
|
||||
|
||||
public:
|
||||
/** Creates a new accessor by starting a new transaction. Only applicable to
|
||||
* the single-node or distributed master. */
|
||||
@ -160,6 +107,9 @@ class GraphDbAccessor {
|
||||
std::experimental::optional<VertexAccessor> FindVertex(gid::Gid gid,
|
||||
bool current_state);
|
||||
|
||||
/** Like `FindVertex`, but performs a CHECK that the result is found. */
|
||||
VertexAccessor FindVertexChecked(gid::Gid gid, bool current_state);
|
||||
|
||||
/**
|
||||
* Returns iterable over accessors to all the vertices in the graph
|
||||
* visible to the current transaction.
|
||||
@ -351,6 +301,10 @@ class GraphDbAccessor {
|
||||
*/
|
||||
std::experimental::optional<EdgeAccessor> FindEdge(gid::Gid gid,
|
||||
bool current_state);
|
||||
|
||||
/** Like `FindEdge`, but performs a CHECK that the result is found. */
|
||||
EdgeAccessor FindEdgeChecked(gid::Gid gid, bool current_state);
|
||||
|
||||
/**
|
||||
* Returns iterable over accessors to all the edges in the graph
|
||||
* visible to the current transaction.
|
||||
@ -583,17 +537,20 @@ class GraphDbAccessor {
|
||||
|
||||
/** Gets remote_vertices or remote_edges, depending on type param. */
|
||||
template <typename TRecord>
|
||||
RemoteCache<TRecord> &remote_elements();
|
||||
distributed::RemoteCache<TRecord> &remote_elements();
|
||||
|
||||
private:
|
||||
GraphDb &db_;
|
||||
tx::Transaction &transaction_;
|
||||
// Indicates if this db-accessor started the transaction and should Abort it
|
||||
// upon destruction.
|
||||
bool transaction_starter_;
|
||||
|
||||
bool commited_{false};
|
||||
bool aborted_{false};
|
||||
|
||||
RemoteCache<Vertex> remote_vertices_;
|
||||
RemoteCache<Edge> remote_edges_;
|
||||
std::experimental::optional<distributed::RemoteCache<Vertex>> remote_vertices_;
|
||||
std::experimental::optional<distributed::RemoteCache<Edge>> remote_edges_;
|
||||
|
||||
/** Casts the transaction engine to SingleNodeEngine and returns it. If the
|
||||
* engine is a WorkerEngine (and not SingleNode nor Master), a call to this
|
||||
|
16
src/distributed/coordination.hpp
Normal file
16
src/distributed/coordination.hpp
Normal file
@ -0,0 +1,16 @@
|
||||
#pragma once
|
||||
|
||||
#include "io/network/endpoint.hpp"
|
||||
|
||||
namespace distributed {
|
||||
|
||||
/** API for the distributed coordination class. */
|
||||
class Coordination {
|
||||
public:
|
||||
virtual ~Coordination() {}
|
||||
|
||||
/** Gets the endpoint for the given worker ID from the master. */
|
||||
virtual io::network::Endpoint GetEndpoint(int worker_id) = 0;
|
||||
};
|
||||
|
||||
} // namespace distributed
|
@ -5,6 +5,9 @@ namespace distributed {
|
||||
|
||||
MasterCoordination::MasterCoordination(communication::messaging::System &system)
|
||||
: system_(system), server_(system, kCoordinationServerName) {
|
||||
// The master is always worker 0.
|
||||
workers_.emplace(0, system.endpoint());
|
||||
|
||||
server_.Register<RegisterWorkerRpc>([this](const RegisterWorkerReq &req) {
|
||||
auto worker_id = RegisterWorker(req.desired_worker_id, req.endpoint);
|
||||
return std::make_unique<RegisterWorkerRes>(worker_id);
|
||||
@ -17,16 +20,19 @@ MasterCoordination::MasterCoordination(communication::messaging::System &system)
|
||||
int MasterCoordination::RegisterWorker(int desired_worker_id,
|
||||
Endpoint endpoint) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
int worker_id = desired_worker_id;
|
||||
// Check if the desired ID is available.
|
||||
if (workers_.find(worker_id) != workers_.end()) {
|
||||
if (desired_worker_id >= 0)
|
||||
LOG(WARNING) << "Unable to assign requested ID (" << worker_id
|
||||
<< ") to worker at \"" << endpoint.address() << ":"
|
||||
<< endpoint.port() << "\"";
|
||||
worker_id = 1;
|
||||
|
||||
// If there is a desired worker ID, try to set it.
|
||||
if (desired_worker_id >= 0) {
|
||||
if (workers_.find(desired_worker_id) == workers_.end()) {
|
||||
workers_.emplace(desired_worker_id, endpoint);
|
||||
return desired_worker_id;
|
||||
}
|
||||
LOG(WARNING) << "Unable to assign requested ID (" << desired_worker_id
|
||||
<< ") to worker at: " << endpoint;
|
||||
}
|
||||
|
||||
// Look for the next ID that's not used.
|
||||
int worker_id = 1;
|
||||
while (workers_.find(worker_id) != workers_.end()) ++worker_id;
|
||||
workers_.emplace(worker_id, endpoint);
|
||||
return worker_id;
|
||||
@ -35,6 +41,8 @@ int MasterCoordination::RegisterWorker(int desired_worker_id,
|
||||
MasterCoordination::~MasterCoordination() {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
for (const auto &kv : workers_) {
|
||||
// Skip master (self).
|
||||
if (kv.first == 0) continue;
|
||||
communication::rpc::Client client(system_, kv.second,
|
||||
kCoordinationServerName);
|
||||
auto result = client.Call<StopWorkerRpc>(100ms);
|
||||
@ -42,11 +50,11 @@ MasterCoordination::~MasterCoordination() {
|
||||
}
|
||||
}
|
||||
|
||||
Endpoint MasterCoordination::GetEndpoint(int worker_id) const {
|
||||
Endpoint MasterCoordination::GetEndpoint(int worker_id) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
auto found = workers_.find(worker_id);
|
||||
CHECK(found != workers_.end())
|
||||
<< "No endpoint registered for worker id: " << worker_id;
|
||||
CHECK(found != workers_.end()) << "No endpoint registered for worker id: "
|
||||
<< worker_id;
|
||||
return found->second;
|
||||
}
|
||||
} // namespace distributed
|
||||
|
@ -5,6 +5,7 @@
|
||||
|
||||
#include "communication/messaging/distributed.hpp"
|
||||
#include "communication/rpc/rpc.hpp"
|
||||
#include "distributed/coordination.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
|
||||
namespace distributed {
|
||||
@ -12,7 +13,7 @@ using Endpoint = io::network::Endpoint;
|
||||
|
||||
/** Handles worker registration, getting of other workers' endpoints and
|
||||
* coordinated shutdown in a distributed memgraph. Master side. */
|
||||
class MasterCoordination {
|
||||
class MasterCoordination : public Coordination {
|
||||
/**
|
||||
* Registers a new worker with this master server. Notifies all the known
|
||||
* workers of the new worker.
|
||||
@ -32,7 +33,7 @@ class MasterCoordination {
|
||||
~MasterCoordination();
|
||||
|
||||
/** Returns the Endpoint for the given worker_id. */
|
||||
Endpoint GetEndpoint(int worker_id) const;
|
||||
Endpoint GetEndpoint(int worker_id) override;
|
||||
|
||||
private:
|
||||
communication::messaging::System &system_;
|
||||
|
@ -1,15 +1,16 @@
|
||||
#pragma once
|
||||
|
||||
#include "data_structures/concurrent/concurrent_map.hpp"
|
||||
#include "distributed/coordination.hpp"
|
||||
#include "distributed/coordination_rpc_messages.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
|
||||
namespace distributed {
|
||||
using Endpoint = io::network::Endpoint;
|
||||
|
||||
/** Handles worker registration, getting of other workers' endpoints and
|
||||
* coordinated shutdown in a distributed memgraph. Worker side. */
|
||||
class WorkerCoordination {
|
||||
class WorkerCoordination : public Coordination {
|
||||
using Endpoint = io::network::Endpoint;
|
||||
|
||||
public:
|
||||
WorkerCoordination(communication::messaging::System &system,
|
||||
const Endpoint &master_endpoint);
|
||||
@ -23,7 +24,7 @@ class WorkerCoordination {
|
||||
int RegisterWorker(int desired_worker_id = -1);
|
||||
|
||||
/** Gets the endpoint for the given worker ID from the master. */
|
||||
Endpoint GetEndpoint(int worker_id);
|
||||
Endpoint GetEndpoint(int worker_id) override;
|
||||
|
||||
/** Starts listening for a remote shutdown command (issued by the master).
|
||||
* Blocks the calling thread until that has finished. */
|
||||
@ -33,6 +34,6 @@ class WorkerCoordination {
|
||||
communication::messaging::System &system_;
|
||||
communication::rpc::Client client_;
|
||||
communication::rpc::Server server_;
|
||||
ConcurrentMap<int, Endpoint> endpoint_cache_;
|
||||
mutable ConcurrentMap<int, Endpoint> endpoint_cache_;
|
||||
};
|
||||
} // namespace distributed
|
||||
|
79
src/distributed/remote_cache.hpp
Normal file
79
src/distributed/remote_cache.hpp
Normal file
@ -0,0 +1,79 @@
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "distributed/remote_data_rpc_clients.hpp"
|
||||
#include "storage/gid.hpp"
|
||||
#include "transactions/transaction.hpp"
|
||||
|
||||
namespace distributed {
|
||||
|
||||
/**
|
||||
* Used for caching Vertices and Edges that are stored on another worker in a
|
||||
* distributed system. Maps global IDs to (old, new) Vertex/Edge pointer
|
||||
* pairs. It is possible that either "old" or "new" are nullptrs, but at
|
||||
* least one must be not-null. The RemoteCache is the owner of TRecord
|
||||
* objects it points to. This class is thread-safe, because it's owning
|
||||
* GraphDbAccessor is thread-safe.
|
||||
*
|
||||
* @tparam TRecord - Edge or Vertex
|
||||
*/
|
||||
template <typename TRecord>
|
||||
class RemoteCache {
|
||||
using rec_uptr = std::unique_ptr<TRecord>;
|
||||
|
||||
public:
|
||||
RemoteCache(distributed::RemoteDataRpcClients &remote_data_clients)
|
||||
: remote_data_clients_(remote_data_clients) {}
|
||||
|
||||
/**
|
||||
* Returns the "new" Vertex/Edge for the given gid.
|
||||
*
|
||||
* @param gid - global ID.
|
||||
* @param init_if_necessary - If "new" is not initialized and this flag is
|
||||
* set, then "new" is initialized with a copy of "old" before returning.
|
||||
*/
|
||||
// TODO most likely remove this function in the new remote_data_comm arch
|
||||
TRecord *FindNew(gid::Gid gid, bool init_if_necessary) {
|
||||
auto found = cache_.find(gid);
|
||||
DCHECK(found != cache_.end()) << "Uninitialized remote Vertex/Edge";
|
||||
auto &pair = found->second;
|
||||
if (!pair.second && init_if_necessary) {
|
||||
pair.second = std::unique_ptr<TRecord>(pair.first->CloneData());
|
||||
}
|
||||
return pair.second.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* For the Vertex/Edge with the given global ID, looks for the data visible
|
||||
* from the given transaction's ID and command ID, and caches it. Sets the
|
||||
* given pointers to point to the fetched data. Analogue to
|
||||
* mvcc::VersionList::find_set_old_new.
|
||||
*/
|
||||
void FindSetOldNew(tx::transaction_id_t tx_id, int worker_id, gid::Gid gid,
|
||||
TRecord *&old_record, TRecord *&new_record) {
|
||||
std::lock_guard<std::mutex> guard{lock_};
|
||||
auto found = cache_.find(gid);
|
||||
if (found == cache_.end()) {
|
||||
rec_uptr old_record =
|
||||
remote_data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid);
|
||||
found = cache_.emplace(
|
||||
gid, std::make_pair<rec_uptr, rec_uptr>(nullptr, nullptr)).first;
|
||||
found->second.first.swap(old_record);
|
||||
}
|
||||
|
||||
old_record = found->second.first.get();
|
||||
new_record = found->second.second.get();
|
||||
}
|
||||
|
||||
private:
|
||||
std::mutex lock_;
|
||||
distributed::RemoteDataRpcClients &remote_data_clients_;
|
||||
// TODO it'd be better if we had VertexData and EdgeData in here, as opposed
|
||||
// to Vertex and Edge.
|
||||
std::unordered_map<gid::Gid, std::pair<rec_uptr, rec_uptr>> cache_;
|
||||
};
|
||||
} // namespace distributed
|
85
src/distributed/remote_data_rpc_clients.hpp
Normal file
85
src/distributed/remote_data_rpc_clients.hpp
Normal file
@ -0,0 +1,85 @@
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
|
||||
#include "communication/messaging/distributed.hpp"
|
||||
#include "communication/rpc/rpc.hpp"
|
||||
#include "database/state_delta.hpp"
|
||||
#include "distributed/coordination.hpp"
|
||||
#include "distributed/remote_data_rpc_messages.hpp"
|
||||
#include "distributed/remote_data_rpc_messages.hpp"
|
||||
#include "storage/gid.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
|
||||
namespace distributed {
|
||||
|
||||
/** Provides access to other worker's data. */
|
||||
class RemoteDataRpcClients {
|
||||
using Client = communication::rpc::Client;
|
||||
|
||||
public:
|
||||
RemoteDataRpcClients(communication::messaging::System &system,
|
||||
Coordination &coordination)
|
||||
: system_(system), coordination_(coordination) {}
|
||||
|
||||
/// Returns a remote worker's data for the given params. That worker must own
|
||||
/// the vertex for the given id, and that vertex must be visible in given
|
||||
/// transaction.
|
||||
std::unique_ptr<Vertex> RemoteVertex(int worker_id,
|
||||
tx::transaction_id_t tx_id,
|
||||
gid::Gid gid) {
|
||||
auto response = RemoteDataClient(worker_id).Call<RemoteVertexRpc>(
|
||||
kRemoteDataRpcTimeout, TxGidPair{tx_id, gid});
|
||||
return std::move(response->name_output_);
|
||||
}
|
||||
|
||||
/// Returns a remote worker's data for the given params. That worker must own
|
||||
/// the edge for the given id, and that edge must be visible in given
|
||||
/// transaction.
|
||||
std::unique_ptr<Edge> RemoteEdge(int worker_id, tx::transaction_id_t tx_id,
|
||||
gid::Gid gid) {
|
||||
auto response = RemoteDataClient(worker_id).Call<RemoteEdgeRpc>(
|
||||
kRemoteDataRpcTimeout, TxGidPair{tx_id, gid});
|
||||
return std::move(response->name_output_);
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
std::unique_ptr<TRecord> RemoteElement(int worker_id,
|
||||
tx::transaction_id_t tx_id,
|
||||
gid::Gid gid);
|
||||
|
||||
private:
|
||||
communication::messaging::System &system_;
|
||||
// TODO make Coordination const, it's member GetEndpoint must be const too.
|
||||
Coordination &coordination_;
|
||||
|
||||
std::unordered_map<int, Client> clients_;
|
||||
std::mutex lock_;
|
||||
|
||||
Client &RemoteDataClient(int worker_id) {
|
||||
std::lock_guard<std::mutex> guard{lock_};
|
||||
auto found = clients_.find(worker_id);
|
||||
if (found != clients_.end()) return found->second;
|
||||
return clients_
|
||||
.emplace(
|
||||
std::piecewise_construct, std::forward_as_tuple(worker_id),
|
||||
std::forward_as_tuple(system_, coordination_.GetEndpoint(worker_id),
|
||||
kRemoteDataRpcName))
|
||||
.first->second;
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
inline std::unique_ptr<Edge> RemoteDataRpcClients::RemoteElement(
|
||||
int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) {
|
||||
return RemoteEdge(worker_id, tx_id, gid);
|
||||
}
|
||||
|
||||
template <>
|
||||
inline std::unique_ptr<Vertex> RemoteDataRpcClients::RemoteElement(
|
||||
int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) {
|
||||
return RemoteVertex(worker_id, tx_id, gid);
|
||||
}
|
||||
|
||||
} // namespace distributed
|
74
src/distributed/remote_data_rpc_messages.hpp
Normal file
74
src/distributed/remote_data_rpc_messages.hpp
Normal file
@ -0,0 +1,74 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "communication/messaging/local.hpp"
|
||||
#include "communication/rpc/rpc.hpp"
|
||||
#include "distributed/serialization.hpp"
|
||||
#include "storage/edge.hpp"
|
||||
#include "storage/gid.hpp"
|
||||
#include "storage/vertex.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
#include "utils/rpc_pimp.hpp"
|
||||
|
||||
namespace distributed {
|
||||
const std::string kRemoteDataRpcName = "RemoteDataRpc";
|
||||
const auto kRemoteDataRpcTimeout = 100ms;
|
||||
|
||||
struct TxGidPair {
|
||||
tx::transaction_id_t tx_id;
|
||||
gid::Gid gid;
|
||||
|
||||
private:
|
||||
friend class boost::serialization::access;
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &tx_id;
|
||||
ar &gid;
|
||||
}
|
||||
};
|
||||
|
||||
#define MAKE_RESPONSE(type, name) \
|
||||
class Remote##type##Res : public communication::messaging::Message { \
|
||||
public: \
|
||||
Remote##type##Res() {} \
|
||||
Remote##type##Res(const type *name, int worker_id) \
|
||||
: name_input_(name), worker_id_(worker_id) {} \
|
||||
\
|
||||
template <class TArchive> \
|
||||
void save(TArchive &ar, unsigned int) const { \
|
||||
ar << boost::serialization::base_object< \
|
||||
const communication::messaging::Message>(*this); \
|
||||
Save##type(ar, *name_input_, worker_id_); \
|
||||
} \
|
||||
\
|
||||
template <class TArchive> \
|
||||
void load(TArchive &ar, unsigned int) { \
|
||||
ar >> boost::serialization::base_object< \
|
||||
communication::messaging::Message>(*this); \
|
||||
auto v = Load##type(ar); \
|
||||
v.swap(name_output_); \
|
||||
} \
|
||||
BOOST_SERIALIZATION_SPLIT_MEMBER() \
|
||||
\
|
||||
const type *name_input_; \
|
||||
int worker_id_; \
|
||||
std::unique_ptr<type> name_output_; \
|
||||
};
|
||||
|
||||
MAKE_RESPONSE(Vertex, vertex)
|
||||
MAKE_RESPONSE(Edge, edge)
|
||||
|
||||
#undef MAKE_RESPONSE
|
||||
|
||||
RPC_SINGLE_MEMBER_MESSAGE(RemoteVertexReq, TxGidPair);
|
||||
RPC_SINGLE_MEMBER_MESSAGE(RemoteEdgeReq, TxGidPair);
|
||||
|
||||
using RemoteVertexRpc =
|
||||
communication::rpc::RequestResponse<RemoteVertexReq, RemoteVertexRes>;
|
||||
using RemoteEdgeRpc =
|
||||
communication::rpc::RequestResponse<RemoteEdgeReq, RemoteEdgeRes>;
|
||||
|
||||
} // namespace distributed
|
44
src/distributed/remote_data_rpc_server.hpp
Normal file
44
src/distributed/remote_data_rpc_server.hpp
Normal file
@ -0,0 +1,44 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "communication/messaging/distributed.hpp"
|
||||
#include "communication/rpc/rpc.hpp"
|
||||
#include "database/graph_db.hpp"
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "distributed/remote_data_rpc_messages.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
|
||||
namespace distributed {
|
||||
|
||||
/** Serves this worker's data to others. */
|
||||
class RemoteDataRpcServer {
|
||||
// TODO maybe reuse GraphDbAccessors. It would reduce the load on tx::Engine
|
||||
// locks (not sure what the gain would be). But have some way of cache
|
||||
// invalidation.
|
||||
public:
|
||||
RemoteDataRpcServer(database::GraphDb &db,
|
||||
communication::messaging::System &system)
|
||||
: db_(db), system_(system) {
|
||||
rpc_server_.Register<RemoteVertexRpc>([this](const RemoteVertexReq &req) {
|
||||
database::GraphDbAccessor dba(db_, req.member.tx_id);
|
||||
auto vertex = dba.FindVertexChecked(req.member.gid, false);
|
||||
CHECK(vertex.GetOld())
|
||||
<< "Old record must exist when sending vertex by RPC";
|
||||
return std::make_unique<RemoteVertexRes>(vertex.GetOld(), db_.WorkerId());
|
||||
});
|
||||
|
||||
rpc_server_.Register<RemoteEdgeRpc>([this](const RemoteEdgeReq &req) {
|
||||
database::GraphDbAccessor dba(db_, req.member.tx_id);
|
||||
auto edge = dba.FindEdgeChecked(req.member.gid, false);
|
||||
CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC";
|
||||
return std::make_unique<RemoteEdgeRes>(edge.GetOld(), db_.WorkerId());
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
database::GraphDb &db_;
|
||||
communication::messaging::System &system_;
|
||||
communication::rpc::Server rpc_server_{system_, kRemoteDataRpcName};
|
||||
};
|
||||
} // namespace distributed
|
@ -148,7 +148,8 @@ bool RecordAccessor<TRecord>::Reconstruct() const {
|
||||
address_.local()->find_set_old_new(db_accessor_->transaction(), old_, new_);
|
||||
} else {
|
||||
db_accessor().template remote_elements<TRecord>().FindSetOldNew(
|
||||
db_accessor().transaction(), address_.global_id(), old_, new_);
|
||||
db_accessor().transaction().id_, address_.worker_id(),
|
||||
address_.global_id(), old_, new_);
|
||||
}
|
||||
current_ = old_ ? old_ : new_;
|
||||
return old_ != nullptr || new_ != nullptr;
|
||||
@ -178,8 +179,8 @@ TRecord &RecordAccessor<TRecord>::update() const {
|
||||
new_ = address_.local()->update(t);
|
||||
DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update";
|
||||
} else {
|
||||
new_ = db_accessor().template remote_elements<TRecord>().FindNew(
|
||||
address_.global_id(), true);
|
||||
// TODO implement
|
||||
throw std::runtime_error("Not yet implemented");
|
||||
}
|
||||
return *new_;
|
||||
}
|
||||
|
@ -105,6 +105,9 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
|
||||
*/
|
||||
RecordAccessor<TRecord> &SwitchNew();
|
||||
|
||||
/** Returns the new record pointer. */
|
||||
TRecord *GetNew() const { return new_; }
|
||||
|
||||
/**
|
||||
* Attempts to switch this accessor to use the latest version not updated by
|
||||
* the current transaction+command. If that is not possible (vertex/edge was
|
||||
@ -115,6 +118,9 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
|
||||
*/
|
||||
RecordAccessor<TRecord> &SwitchOld();
|
||||
|
||||
/** Returns the old record pointer. */
|
||||
TRecord *GetOld() const { return old_; }
|
||||
|
||||
/**
|
||||
* Reconstructs the internal state of the record accessor so it uses the
|
||||
* versions appropriate to this transaction+command.
|
||||
|
@ -51,7 +51,7 @@ class VertexAccessor : public RecordAccessor<Vertex> {
|
||||
public:
|
||||
VertexAccessor(VertexAddress address, database::GraphDbAccessor &db_accessor)
|
||||
: RecordAccessor(address, db_accessor) {
|
||||
RecordAccessor::Reconstruct();
|
||||
Reconstruct();
|
||||
}
|
||||
|
||||
/** Returns the number of outgoing edges. */
|
||||
|
@ -61,7 +61,6 @@ tx::Transaction *WorkerEngine::RunningTransaction(tx::transaction_id_t tx_id) {
|
||||
std::move(rpc_client_.Call<SnapshotRpc>(kRpcTimeout, tx_id)->member));
|
||||
auto insertion =
|
||||
accessor.insert(tx_id, new Transaction(tx_id, snapshot, *this));
|
||||
CHECK(insertion.second) << "Transaction already inserted";
|
||||
utils::EnsureAtomicGe(local_last_, tx_id);
|
||||
return insertion.first->second;
|
||||
}
|
||||
|
@ -51,8 +51,7 @@ class Snapshot {
|
||||
|
||||
/** Removes the given transaction id from this Snapshot.
|
||||
*
|
||||
* @param id - the transaction id to remove
|
||||
*/
|
||||
* @param id - the transaction id to remove */
|
||||
void remove(transaction_id_t id) {
|
||||
auto last =
|
||||
std::remove(transaction_ids_.begin(), transaction_ids_.end(), id);
|
||||
|
@ -1,3 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <experimental/optional>
|
||||
|
||||
#include "boost/serialization/split_free.hpp"
|
||||
|
203
tests/unit/distributed_graph_db.cpp
Normal file
203
tests/unit/distributed_graph_db.cpp
Normal file
@ -0,0 +1,203 @@
|
||||
#include <experimental/optional>
|
||||
#include <thread>
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "communication/messaging/distributed.hpp"
|
||||
#include "database/graph_db.hpp"
|
||||
#include "distributed/coordination.hpp"
|
||||
#include "distributed/coordination_master.hpp"
|
||||
#include "distributed/coordination_worker.hpp"
|
||||
#include "distributed/remote_data_rpc_clients.hpp"
|
||||
#include "distributed/remote_data_rpc_server.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "transactions/engine_master.hpp"
|
||||
|
||||
template <typename T>
|
||||
using optional = std::experimental::optional<T>;
|
||||
|
||||
using namespace distributed;
|
||||
|
||||
class DistributedGraphDbTest : public ::testing::Test {
|
||||
const std::string kLocal = "127.0.0.1";
|
||||
class WorkerInThread {
|
||||
public:
|
||||
WorkerInThread(database::Config config) : worker_(config) {
|
||||
thread_ = std::thread([this, config] { worker_.WaitForShutdown(); });
|
||||
}
|
||||
|
||||
~WorkerInThread() {
|
||||
if (thread_.joinable()) thread_.join();
|
||||
}
|
||||
|
||||
database::Worker worker_;
|
||||
std::thread thread_;
|
||||
};
|
||||
|
||||
protected:
|
||||
void SetUp() override {
|
||||
const auto kInitTime = 200ms;
|
||||
|
||||
database::Config master_config;
|
||||
master_config.master_endpoint = {kLocal, 0};
|
||||
master_.emplace(master_config);
|
||||
std::this_thread::sleep_for(kInitTime);
|
||||
|
||||
auto worker_config = [this](int worker_id) {
|
||||
database::Config config;
|
||||
config.worker_id = worker_id;
|
||||
config.master_endpoint = master_->endpoint();
|
||||
config.worker_endpoint = {kLocal, 0};
|
||||
return config;
|
||||
};
|
||||
|
||||
worker1_.emplace(worker_config(1));
|
||||
std::this_thread::sleep_for(kInitTime);
|
||||
worker2_.emplace(worker_config(2));
|
||||
std::this_thread::sleep_for(kInitTime);
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
// Kill master first because it will expect a shutdown response from the
|
||||
// workers.
|
||||
master_ = std::experimental::nullopt;
|
||||
|
||||
worker2_ = std::experimental::nullopt;
|
||||
worker1_ = std::experimental::nullopt;
|
||||
}
|
||||
|
||||
database::Master &master() { return *master_; }
|
||||
auto &master_tx_engine() {
|
||||
return dynamic_cast<tx::MasterEngine &>(master_->tx_engine());
|
||||
}
|
||||
database::Worker &worker1() { return worker1_->worker_; }
|
||||
database::Worker &worker2() { return worker2_->worker_; }
|
||||
|
||||
private:
|
||||
optional<database::Master> master_;
|
||||
optional<WorkerInThread> worker1_;
|
||||
optional<WorkerInThread> worker2_;
|
||||
};
|
||||
|
||||
TEST_F(DistributedGraphDbTest, Coordination) {
|
||||
EXPECT_NE(master().endpoint().port(), 0);
|
||||
EXPECT_NE(worker1().endpoint().port(), 0);
|
||||
EXPECT_NE(worker2().endpoint().port(), 0);
|
||||
|
||||
EXPECT_EQ(master().GetEndpoint(1), worker1().endpoint());
|
||||
EXPECT_EQ(master().GetEndpoint(2), worker2().endpoint());
|
||||
EXPECT_EQ(worker1().GetEndpoint(0), master().endpoint());
|
||||
EXPECT_EQ(worker1().GetEndpoint(2), worker2().endpoint());
|
||||
EXPECT_EQ(worker2().GetEndpoint(0), master().endpoint());
|
||||
EXPECT_EQ(worker2().GetEndpoint(1), worker1().endpoint());
|
||||
}
|
||||
|
||||
TEST_F(DistributedGraphDbTest, TxEngine) {
|
||||
auto *tx1 = master_tx_engine().Begin();
|
||||
auto *tx2 = master_tx_engine().Begin();
|
||||
EXPECT_EQ(tx2->snapshot().size(), 1);
|
||||
EXPECT_EQ(
|
||||
worker1().tx_engine().RunningTransaction(tx1->id_)->snapshot().size(), 0);
|
||||
EXPECT_EQ(worker2().tx_engine().RunningTransaction(tx2->id_)->snapshot(),
|
||||
tx2->snapshot());
|
||||
|
||||
::testing::FLAGS_gtest_death_test_style = "fast";
|
||||
EXPECT_DEATH(worker2().tx_engine().RunningTransaction(123), "");
|
||||
}
|
||||
|
||||
template <typename TType>
|
||||
using mapper_vec =
|
||||
std::vector<std::reference_wrapper<storage::ConcurrentIdMapper<TType>>>;
|
||||
|
||||
TEST_F(DistributedGraphDbTest, StorageTypes) {
|
||||
auto test_mappers = [](auto mappers, auto ids) {
|
||||
for (size_t i = 0; i < mappers.size(); ++i) {
|
||||
ids.emplace_back(
|
||||
mappers[i].get().value_to_id("value" + std::to_string(i)));
|
||||
}
|
||||
EXPECT_GT(ids.size(), 0);
|
||||
for (size_t i = 0; i < mappers.size(); ++i) {
|
||||
for (size_t j = 0; j < ids.size(); ++j) {
|
||||
EXPECT_EQ(mappers[i].get().id_to_value(ids[j]),
|
||||
"value" + std::to_string(j));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
test_mappers(mapper_vec<storage::Label>{master().label_mapper(),
|
||||
worker1().label_mapper(),
|
||||
worker2().label_mapper()},
|
||||
std::vector<storage::Label>{});
|
||||
test_mappers(mapper_vec<storage::EdgeType>{master().edge_type_mapper(),
|
||||
worker1().edge_type_mapper(),
|
||||
worker2().edge_type_mapper()},
|
||||
std::vector<storage::EdgeType>{});
|
||||
test_mappers(mapper_vec<storage::Property>{master().property_mapper(),
|
||||
worker1().property_mapper(),
|
||||
worker2().property_mapper()},
|
||||
std::vector<storage::Property>{});
|
||||
}
|
||||
|
||||
TEST_F(DistributedGraphDbTest, Counters) {
|
||||
EXPECT_EQ(master().counters().Get("a"), 0);
|
||||
EXPECT_EQ(worker1().counters().Get("a"), 1);
|
||||
EXPECT_EQ(worker2().counters().Get("a"), 2);
|
||||
|
||||
EXPECT_EQ(worker1().counters().Get("b"), 0);
|
||||
EXPECT_EQ(worker2().counters().Get("b"), 1);
|
||||
EXPECT_EQ(master().counters().Get("b"), 2);
|
||||
}
|
||||
|
||||
TEST_F(DistributedGraphDbTest, RemoteDataGetting) {
|
||||
using GraphDbAccessor = database::GraphDbAccessor;
|
||||
// Only old data is visible remotely, so create and commit some data.
|
||||
gid::Gid v1_id, v2_id, e1_id;
|
||||
|
||||
{
|
||||
GraphDbAccessor dba{master()};
|
||||
auto v1 = dba.InsertVertex();
|
||||
auto v2 = dba.InsertVertex();
|
||||
auto e1 = dba.InsertEdge(v1, v2, dba.EdgeType("et"));
|
||||
|
||||
// Set some data so we see we're getting the right stuff.
|
||||
v1.PropsSet(dba.Property("p1"), 42);
|
||||
v1.add_label(dba.Label("label"));
|
||||
v2.PropsSet(dba.Property("p2"), "value");
|
||||
e1.PropsSet(dba.Property("p3"), true);
|
||||
|
||||
v1_id = v1.gid();
|
||||
v2_id = v2.gid();
|
||||
e1_id = e1.gid();
|
||||
|
||||
dba.Commit();
|
||||
}
|
||||
|
||||
// The master must start a transaction before workers can work in it.
|
||||
database::GraphDbAccessor master_dba{master()};
|
||||
|
||||
{
|
||||
database::GraphDbAccessor w1_dba{worker1(), master_dba.transaction_id()};
|
||||
VertexAccessor v1_in_w1{{v1_id, 0}, w1_dba};
|
||||
EXPECT_NE(v1_in_w1.GetOld(), nullptr);
|
||||
EXPECT_EQ(v1_in_w1.GetNew(), nullptr);
|
||||
EXPECT_EQ(v1_in_w1.PropsAt(w1_dba.Property("p1")).Value<int64_t>(), 42);
|
||||
EXPECT_TRUE(v1_in_w1.has_label(w1_dba.Label("label")));
|
||||
}
|
||||
|
||||
{
|
||||
database::GraphDbAccessor w2_dba{worker2(), master_dba.transaction_id()};
|
||||
VertexAccessor v2_in_w2{{v2_id, 0}, w2_dba};
|
||||
EXPECT_NE(v2_in_w2.GetOld(), nullptr);
|
||||
EXPECT_EQ(v2_in_w2.GetNew(), nullptr);
|
||||
EXPECT_EQ(v2_in_w2.PropsAt(w2_dba.Property("p2")).Value<std::string>(),
|
||||
"value");
|
||||
EXPECT_FALSE(v2_in_w2.has_label(w2_dba.Label("label")));
|
||||
|
||||
VertexAccessor v1_in_w2{{v1_id, 0}, w2_dba};
|
||||
EdgeAccessor e1_in_w2{{e1_id, 0}, w2_dba};
|
||||
EXPECT_EQ(e1_in_w2.from(), v1_in_w2);
|
||||
EXPECT_EQ(e1_in_w2.to(), v2_in_w2);
|
||||
EXPECT_EQ(e1_in_w2.EdgeType(), w2_dba.EdgeType("et"));
|
||||
EXPECT_EQ(e1_in_w2.PropsAt(w2_dba.Property("p3")).Value<bool>(), true);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user