diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index 551b0060f..6ddd35421 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -3,6 +3,7 @@ #include "boost/serialization/export.hpp" #include "distributed/coordination_rpc_messages.hpp" +#include "distributed/remote_data_rpc_messages.hpp" #include "storage/concurrent_id_mapper_rpc_messages.hpp" #include "transactions/engine_rpc_messages.hpp" @@ -27,9 +28,17 @@ ID_VALUE_EXPORT_BOOST_TYPE(Property) #undef ID_VALUE_EXPORT_BOOST_TYPE +// Distributed coordination. BOOST_CLASS_EXPORT(distributed::RegisterWorkerReq); BOOST_CLASS_EXPORT(distributed::RegisterWorkerRes); BOOST_CLASS_EXPORT(distributed::GetEndpointReq); BOOST_CLASS_EXPORT(distributed::GetEndpointRes); BOOST_CLASS_EXPORT(distributed::StopWorkerReq); BOOST_CLASS_EXPORT(distributed::StopWorkerRes); + +// Distributed data exchange. +BOOST_CLASS_EXPORT(distributed::RemoteEdgeReq); +BOOST_CLASS_EXPORT(distributed::RemoteEdgeRes); +BOOST_CLASS_EXPORT(distributed::RemoteVertexReq); +BOOST_CLASS_EXPORT(distributed::RemoteVertexRes); +BOOST_CLASS_EXPORT(distributed::TxGidPair); diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index d3c7e9a7f..7b2e1a02d 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -1,7 +1,11 @@ -#include "database/graph_db.hpp" +#include "glog/logging.h" + #include "communication/messaging/distributed.hpp" +#include "database/graph_db.hpp" #include "distributed/coordination_master.hpp" #include "distributed/coordination_worker.hpp" +#include "distributed/remote_data_rpc_clients.hpp" +#include "distributed/remote_data_rpc_server.hpp" #include "durability/paths.hpp" #include "durability/recovery.hpp" #include "durability/snapshooter.hpp" @@ -64,36 +68,61 @@ struct TypemapPack { class SingleNode : public PrivateBase { public: explicit SingleNode(const Config &config) : PrivateBase(config) {} + GraphDb::Type type() const override { return GraphDb::Type::SINGLE_NODE; } IMPL_GETTERS - private: tx::SingleNodeEngine tx_engine_{&wal_}; StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec}; TypemapPack<SingleNodeConcurrentIdMapper> typemap_pack_; database::SingleNodeCounters counters_; + distributed::RemoteDataRpcServer &remote_data_server() override { + LOG(FATAL) << "Remote data server not available in single-node."; + } + distributed::RemoteDataRpcClients &remote_data_clients() override { + LOG(FATAL) << "Remote data clients not available in single-node."; + } }; +#define IMPL_DISTRIBUTED_GETTERS \ + distributed::RemoteDataRpcServer &remote_data_server() override { \ + return remote_data_server_; \ + } \ + distributed::RemoteDataRpcClients &remote_data_clients() override { \ + return remote_data_clients_; \ + } + class Master : public PrivateBase { public: explicit Master(const Config &config) : PrivateBase(config) {} + GraphDb::Type type() const override { + return GraphDb::Type::DISTRIBUTED_MASTER; + } IMPL_GETTERS + IMPL_DISTRIBUTED_GETTERS - private: communication::messaging::System system_{config_.master_endpoint}; tx::MasterEngine tx_engine_{system_, &wal_}; StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec}; - distributed::MasterCoordination coordination{system_}; + distributed::MasterCoordination coordination_{system_}; TypemapPack<MasterConcurrentIdMapper> typemap_pack_{system_}; database::MasterCounters counters_{system_}; + distributed::RemoteDataRpcServer remote_data_server_{*this, system_}; + distributed::RemoteDataRpcClients remote_data_clients_{system_, + coordination_}; }; class Worker : public PrivateBase { public: - explicit Worker(const Config &config) : PrivateBase(config) {} - IMPL_GETTERS - void WaitForShutdown() { coordination_.WaitForShutdown(); } + explicit Worker(const Config &config) : PrivateBase(config) { + coordination_.RegisterWorker(config.worker_id); + } + + GraphDb::Type type() const override { + return GraphDb::Type::DISTRIBUTED_WORKER; + } + IMPL_GETTERS + IMPL_DISTRIBUTED_GETTERS - private: communication::messaging::System system_{config_.worker_endpoint}; distributed::WorkerCoordination coordination_{system_, config_.master_endpoint}; @@ -102,6 +131,9 @@ class Worker : public PrivateBase { TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{system_, config_.master_endpoint}; database::WorkerCounters counters_{system_, config_.master_endpoint}; + distributed::RemoteDataRpcServer remote_data_server_{*this, system_}; + distributed::RemoteDataRpcClients remote_data_clients_{system_, + coordination_}; }; #undef IMPL_GETTERS @@ -127,6 +159,7 @@ PublicBase::~PublicBase() { if (impl_->config_.snapshot_on_exit) MakeSnapshot(); } +GraphDb::Type PublicBase::type() const { return impl_->type(); } Storage &PublicBase::storage() { return impl_->storage(); } durability::WriteAheadLog &PublicBase::wal() { return impl_->wal(); } tx::Engine &PublicBase::tx_engine() { return impl_->tx_engine(); } @@ -142,6 +175,12 @@ ConcurrentIdMapper<Property> &PublicBase::property_mapper() { database::Counters &PublicBase::counters() { return impl_->counters(); } void PublicBase::CollectGarbage() { impl_->CollectGarbage(); } int PublicBase::WorkerId() const { return impl_->WorkerId(); } +distributed::RemoteDataRpcServer &PublicBase::remote_data_server() { + return impl_->remote_data_server(); +} +distributed::RemoteDataRpcClients &PublicBase::remote_data_clients() { + return impl_->remote_data_clients(); +} void PublicBase::MakeSnapshot() { const bool status = durability::MakeSnapshot( @@ -187,10 +226,28 @@ SingleNode::SingleNode(Config config) Master::Master(Config config) : MasterBase(std::make_unique<impl::Master>(config)) {} +io::network::Endpoint Master::endpoint() const { + return dynamic_cast<impl::Master *>(impl_.get())->system_.endpoint(); +} + +io::network::Endpoint Master::GetEndpoint(int worker_id) { + return dynamic_cast<impl::Master *>(impl_.get()) + ->coordination_.GetEndpoint(worker_id); +} + Worker::Worker(Config config) : PublicBase(std::make_unique<impl::Worker>(config)) {} +io::network::Endpoint Worker::endpoint() const { + return dynamic_cast<impl::Worker *>(impl_.get())->system_.endpoint(); +} + +io::network::Endpoint Worker::GetEndpoint(int worker_id) { + return dynamic_cast<impl::Worker *>(impl_.get()) + ->coordination_.GetEndpoint(worker_id); +} + void Worker::WaitForShutdown() { - dynamic_cast<impl::Worker *>(impl_.get())->WaitForShutdown(); + dynamic_cast<impl::Worker *>(impl_.get())->coordination_.WaitForShutdown(); } } // namespace database diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index c626e640c..e60edcc3a 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -3,9 +3,6 @@ #include <atomic> #include <memory> -#include "gflags/gflags.h" -#include "glog/logging.h" - #include "database/counters.hpp" #include "database/storage.hpp" #include "database/storage_gc.hpp" @@ -16,6 +13,11 @@ #include "transactions/engine.hpp" #include "utils/scheduler.hpp" +namespace distributed { + class RemoteDataRpcServer; + class RemoteDataRpcClients; +} + namespace database { /// Database configuration. Initialized from flags, but modifiable. @@ -64,9 +66,16 @@ struct Config { */ class GraphDb { public: + enum class Type { + SINGLE_NODE, + DISTRIBUTED_MASTER, + DISTRIBUTED_WORKER + }; + GraphDb() {} virtual ~GraphDb() {} + virtual Type type() const = 0; virtual Storage &storage() = 0; virtual durability::WriteAheadLog &wal() = 0; virtual tx::Engine &tx_engine() = 0; @@ -78,6 +87,10 @@ class GraphDb { virtual void CollectGarbage() = 0; virtual int WorkerId() const = 0; + // Supported only in distributed master and worker, not in single-node. + virtual distributed::RemoteDataRpcServer &remote_data_server() = 0; + virtual distributed::RemoteDataRpcClients &remote_data_clients() = 0; + GraphDb(const GraphDb &) = delete; GraphDb(GraphDb &&) = delete; GraphDb &operator=(const GraphDb &) = delete; @@ -94,6 +107,7 @@ class PrivateBase; // initialization and cleanup. class PublicBase : public GraphDb { public: + Type type() const override; Storage &storage() override; durability::WriteAheadLog &wal() override; tx::Engine &tx_engine() override; @@ -103,6 +117,8 @@ class PublicBase : public GraphDb { database::Counters &counters() override; void CollectGarbage() override; int WorkerId() const override; + distributed::RemoteDataRpcServer &remote_data_server() override; + distributed::RemoteDataRpcClients &remote_data_clients() override; protected: explicit PublicBase(std::unique_ptr<PrivateBase> impl); @@ -138,11 +154,21 @@ class SingleNode : public MasterBase { class Master : public MasterBase { public: explicit Master(Config config = Config()); + /** Gets this master's endpoint. */ + io::network::Endpoint endpoint() const; + /** Gets the endpoint of the worker with the given id. */ + // TODO make const once Coordination::GetEndpoint is const. + io::network::Endpoint GetEndpoint(int worker_id); }; class Worker : public impl::PublicBase { public: explicit Worker(Config config = Config()); + /** Gets this worker's endpoint. */ + io::network::Endpoint endpoint() const; + /** Gets the endpoint of the worker with the given id. */ + // TODO make const once Coordination::GetEndpoint is const. + io::network::Endpoint GetEndpoint(int worker_id); void WaitForShutdown(); }; } // namespace database diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index daa337f9a..d46da8d0d 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -12,13 +12,27 @@ namespace database { GraphDbAccessor::GraphDbAccessor(GraphDb &db) - : db_(db), transaction_(*SingleNodeEngine().Begin()) {} + : db_(db), + transaction_(*SingleNodeEngine().Begin()), + transaction_starter_{true} { + if (db_.type() != GraphDb::Type::SINGLE_NODE) { + remote_vertices_.emplace(db_.remote_data_clients()); + remote_edges_.emplace(db_.remote_data_clients()); + } +} GraphDbAccessor::GraphDbAccessor(GraphDb &db, tx::transaction_id_t tx_id) - : db_(db), transaction_(*db.tx_engine().RunningTransaction(tx_id)) {} + : db_(db), + transaction_(*db.tx_engine().RunningTransaction(tx_id)), + transaction_starter_{false} { + if (db_.type() != GraphDb::Type::SINGLE_NODE) { + remote_vertices_.emplace(db_.remote_data_clients()); + remote_edges_.emplace(db_.remote_data_clients()); + } +} GraphDbAccessor::~GraphDbAccessor() { - if (!commited_ && !aborted_) { + if (transaction_starter_ && !commited_ && !aborted_) { this->Abort(); } } @@ -78,6 +92,13 @@ std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertex( return record_accessor; } +VertexAccessor GraphDbAccessor::FindVertexChecked(gid::Gid gid, + bool current_state) { + auto found = FindVertex(gid, current_state); + CHECK(found) << "Unable to find vertex for id: " << gid; + return *found; +} + std::experimental::optional<EdgeAccessor> GraphDbAccessor::FindEdge( gid::Gid gid, bool current_state) { auto collection_accessor = db_.storage().edges_.access(); @@ -89,6 +110,13 @@ std::experimental::optional<EdgeAccessor> GraphDbAccessor::FindEdge( return record_accessor; } +EdgeAccessor GraphDbAccessor::FindEdgeChecked(gid::Gid gid, + bool current_state) { + auto found = FindEdge(gid, current_state); + CHECK(found) << "Unable to find edge for id: " << gid; + return *found; +} + void GraphDbAccessor::BuildIndex(storage::Label label, storage::Property property) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; @@ -433,16 +461,16 @@ std::vector<std::string> GraphDbAccessor::IndexInfo() const { } return info; } -auto &GraphDbAccessor::remote_vertices() { return remote_vertices_; } -auto &GraphDbAccessor::remote_edges() { return remote_edges_; } +auto &GraphDbAccessor::remote_vertices() { return *remote_vertices_; } +auto &GraphDbAccessor::remote_edges() { return *remote_edges_; } template <> -GraphDbAccessor::RemoteCache<Vertex> &GraphDbAccessor::remote_elements() { +distributed::RemoteCache<Vertex> &GraphDbAccessor::remote_elements() { return remote_vertices(); } template <> -GraphDbAccessor::RemoteCache<Edge> &GraphDbAccessor::remote_elements() { +distributed::RemoteCache<Edge> &GraphDbAccessor::remote_elements() { return remote_edges(); } } // namespace database diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index e0412d3e8..bac191f09 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -8,6 +8,7 @@ #include "glog/logging.h" #include "database/graph_db.hpp" +#include "distributed/remote_cache.hpp" #include "storage/edge_accessor.hpp" #include "storage/types.hpp" #include "storage/vertex_accessor.hpp" @@ -38,60 +39,6 @@ class GraphDbAccessor { friend class ::VertexAccessor; friend class ::EdgeAccessor; - /** - * Used for caching Vertices and Edges that are stored on another worker in a - * distributed system. Maps global IDs to (old, new) Vertex/Edge pointer - * pairs. It is possible that either "old" or "new" are nullptrs, but at - * least one must be not-null. The RemoteCache is the owner of TRecord - * objects it points to. - * - * @tparam TRecord - Edge or Vertex - */ - template <typename TRecord> - class RemoteCache { - public: - ~RemoteCache() { - for (const auto &pair : cache_) { - delete pair.second.first; - delete pair.second.second; - } - } - - /** - * Returns the "new" Vertex/Edge for the given gid. - * - * @param gid - global ID. - * @param init_if_necessary - If "new" is not initialized and this flag is - * set, then "new" is initialized with a copy of "old" before returning. - */ - TRecord *FindNew(gid::Gid gid, bool init_if_necessary) { - auto found = cache_.find(gid); - DCHECK(found != cache_.end()) << "Uninitialized remote Vertex/Edge"; - auto &pair = found->second; - if (!pair.second && init_if_necessary) { - pair.second = pair.first->CloneData(); - } - return pair.second; - } - - /** - * For the Vertex/Edge with the given global ID, looks for the data visible - * from the given transaction's ID and command ID, and caches it. Sets the - * given pointers to point to the fetched data. Analogue to - * mvcc::VersionList::find_set_old_new. - */ - void FindSetOldNew(const tx::Transaction &, gid::Gid, TRecord *&, - TRecord *&) { - LOG(ERROR) << "Remote data storage not yet implemented"; - // TODO fetch data for (gid, t.id_, t.cmd_id()) from remote worker. - // Set that data in the cache. - // Set the pointers to the new data. - } - - private: - std::unordered_map<gid::Gid, std::pair<TRecord *, TRecord *>> cache_; - }; - public: /** Creates a new accessor by starting a new transaction. Only applicable to * the single-node or distributed master. */ @@ -160,6 +107,9 @@ class GraphDbAccessor { std::experimental::optional<VertexAccessor> FindVertex(gid::Gid gid, bool current_state); + /** Like `FindVertex`, but performs a CHECK that the result is found. */ + VertexAccessor FindVertexChecked(gid::Gid gid, bool current_state); + /** * Returns iterable over accessors to all the vertices in the graph * visible to the current transaction. @@ -351,6 +301,10 @@ class GraphDbAccessor { */ std::experimental::optional<EdgeAccessor> FindEdge(gid::Gid gid, bool current_state); + + /** Like `FindEdge`, but performs a CHECK that the result is found. */ + EdgeAccessor FindEdgeChecked(gid::Gid gid, bool current_state); + /** * Returns iterable over accessors to all the edges in the graph * visible to the current transaction. @@ -583,17 +537,20 @@ class GraphDbAccessor { /** Gets remote_vertices or remote_edges, depending on type param. */ template <typename TRecord> - RemoteCache<TRecord> &remote_elements(); + distributed::RemoteCache<TRecord> &remote_elements(); private: GraphDb &db_; tx::Transaction &transaction_; + // Indicates if this db-accessor started the transaction and should Abort it + // upon destruction. + bool transaction_starter_; bool commited_{false}; bool aborted_{false}; - RemoteCache<Vertex> remote_vertices_; - RemoteCache<Edge> remote_edges_; + std::experimental::optional<distributed::RemoteCache<Vertex>> remote_vertices_; + std::experimental::optional<distributed::RemoteCache<Edge>> remote_edges_; /** Casts the transaction engine to SingleNodeEngine and returns it. If the * engine is a WorkerEngine (and not SingleNode nor Master), a call to this diff --git a/src/distributed/coordination.hpp b/src/distributed/coordination.hpp new file mode 100644 index 000000000..0527bf36c --- /dev/null +++ b/src/distributed/coordination.hpp @@ -0,0 +1,16 @@ +#pragma once + +#include "io/network/endpoint.hpp" + +namespace distributed { + +/** API for the distributed coordination class. */ +class Coordination { + public: + virtual ~Coordination() {} + + /** Gets the endpoint for the given worker ID from the master. */ + virtual io::network::Endpoint GetEndpoint(int worker_id) = 0; +}; + +} // namespace distributed diff --git a/src/distributed/coordination_master.cpp b/src/distributed/coordination_master.cpp index 49dd8c7ff..90d1f8ba1 100644 --- a/src/distributed/coordination_master.cpp +++ b/src/distributed/coordination_master.cpp @@ -5,6 +5,9 @@ namespace distributed { MasterCoordination::MasterCoordination(communication::messaging::System &system) : system_(system), server_(system, kCoordinationServerName) { + // The master is always worker 0. + workers_.emplace(0, system.endpoint()); + server_.Register<RegisterWorkerRpc>([this](const RegisterWorkerReq &req) { auto worker_id = RegisterWorker(req.desired_worker_id, req.endpoint); return std::make_unique<RegisterWorkerRes>(worker_id); @@ -17,16 +20,19 @@ MasterCoordination::MasterCoordination(communication::messaging::System &system) int MasterCoordination::RegisterWorker(int desired_worker_id, Endpoint endpoint) { std::lock_guard<std::mutex> guard(lock_); - int worker_id = desired_worker_id; - // Check if the desired ID is available. - if (workers_.find(worker_id) != workers_.end()) { - if (desired_worker_id >= 0) - LOG(WARNING) << "Unable to assign requested ID (" << worker_id - << ") to worker at \"" << endpoint.address() << ":" - << endpoint.port() << "\""; - worker_id = 1; + + // If there is a desired worker ID, try to set it. + if (desired_worker_id >= 0) { + if (workers_.find(desired_worker_id) == workers_.end()) { + workers_.emplace(desired_worker_id, endpoint); + return desired_worker_id; + } + LOG(WARNING) << "Unable to assign requested ID (" << desired_worker_id + << ") to worker at: " << endpoint; } + // Look for the next ID that's not used. + int worker_id = 1; while (workers_.find(worker_id) != workers_.end()) ++worker_id; workers_.emplace(worker_id, endpoint); return worker_id; @@ -35,6 +41,8 @@ int MasterCoordination::RegisterWorker(int desired_worker_id, MasterCoordination::~MasterCoordination() { std::lock_guard<std::mutex> guard(lock_); for (const auto &kv : workers_) { + // Skip master (self). + if (kv.first == 0) continue; communication::rpc::Client client(system_, kv.second, kCoordinationServerName); auto result = client.Call<StopWorkerRpc>(100ms); @@ -42,11 +50,11 @@ MasterCoordination::~MasterCoordination() { } } -Endpoint MasterCoordination::GetEndpoint(int worker_id) const { +Endpoint MasterCoordination::GetEndpoint(int worker_id) { std::lock_guard<std::mutex> guard(lock_); auto found = workers_.find(worker_id); - CHECK(found != workers_.end()) - << "No endpoint registered for worker id: " << worker_id; + CHECK(found != workers_.end()) << "No endpoint registered for worker id: " + << worker_id; return found->second; } } // namespace distributed diff --git a/src/distributed/coordination_master.hpp b/src/distributed/coordination_master.hpp index 6e95d6787..2976b5b59 100644 --- a/src/distributed/coordination_master.hpp +++ b/src/distributed/coordination_master.hpp @@ -5,6 +5,7 @@ #include "communication/messaging/distributed.hpp" #include "communication/rpc/rpc.hpp" +#include "distributed/coordination.hpp" #include "io/network/endpoint.hpp" namespace distributed { @@ -12,7 +13,7 @@ using Endpoint = io::network::Endpoint; /** Handles worker registration, getting of other workers' endpoints and * coordinated shutdown in a distributed memgraph. Master side. */ -class MasterCoordination { +class MasterCoordination : public Coordination { /** * Registers a new worker with this master server. Notifies all the known * workers of the new worker. @@ -32,7 +33,7 @@ class MasterCoordination { ~MasterCoordination(); /** Returns the Endpoint for the given worker_id. */ - Endpoint GetEndpoint(int worker_id) const; + Endpoint GetEndpoint(int worker_id) override; private: communication::messaging::System &system_; diff --git a/src/distributed/coordination_worker.hpp b/src/distributed/coordination_worker.hpp index fb6280bb8..55c411d5d 100644 --- a/src/distributed/coordination_worker.hpp +++ b/src/distributed/coordination_worker.hpp @@ -1,15 +1,16 @@ #pragma once #include "data_structures/concurrent/concurrent_map.hpp" +#include "distributed/coordination.hpp" #include "distributed/coordination_rpc_messages.hpp" -#include "io/network/endpoint.hpp" namespace distributed { -using Endpoint = io::network::Endpoint; /** Handles worker registration, getting of other workers' endpoints and * coordinated shutdown in a distributed memgraph. Worker side. */ -class WorkerCoordination { +class WorkerCoordination : public Coordination { + using Endpoint = io::network::Endpoint; + public: WorkerCoordination(communication::messaging::System &system, const Endpoint &master_endpoint); @@ -23,7 +24,7 @@ class WorkerCoordination { int RegisterWorker(int desired_worker_id = -1); /** Gets the endpoint for the given worker ID from the master. */ - Endpoint GetEndpoint(int worker_id); + Endpoint GetEndpoint(int worker_id) override; /** Starts listening for a remote shutdown command (issued by the master). * Blocks the calling thread until that has finished. */ @@ -33,6 +34,6 @@ class WorkerCoordination { communication::messaging::System &system_; communication::rpc::Client client_; communication::rpc::Server server_; - ConcurrentMap<int, Endpoint> endpoint_cache_; + mutable ConcurrentMap<int, Endpoint> endpoint_cache_; }; } // namespace distributed diff --git a/src/distributed/remote_cache.hpp b/src/distributed/remote_cache.hpp new file mode 100644 index 000000000..e24647ac6 --- /dev/null +++ b/src/distributed/remote_cache.hpp @@ -0,0 +1,79 @@ +#pragma once + +#include <mutex> +#include <unordered_map> + +#include "glog/logging.h" + +#include "distributed/remote_data_rpc_clients.hpp" +#include "storage/gid.hpp" +#include "transactions/transaction.hpp" + +namespace distributed { + +/** + * Used for caching Vertices and Edges that are stored on another worker in a + * distributed system. Maps global IDs to (old, new) Vertex/Edge pointer + * pairs. It is possible that either "old" or "new" are nullptrs, but at + * least one must be not-null. The RemoteCache is the owner of TRecord + * objects it points to. This class is thread-safe, because it's owning + * GraphDbAccessor is thread-safe. + * + * @tparam TRecord - Edge or Vertex + */ +template <typename TRecord> +class RemoteCache { + using rec_uptr = std::unique_ptr<TRecord>; + + public: + RemoteCache(distributed::RemoteDataRpcClients &remote_data_clients) + : remote_data_clients_(remote_data_clients) {} + + /** + * Returns the "new" Vertex/Edge for the given gid. + * + * @param gid - global ID. + * @param init_if_necessary - If "new" is not initialized and this flag is + * set, then "new" is initialized with a copy of "old" before returning. + */ + // TODO most likely remove this function in the new remote_data_comm arch + TRecord *FindNew(gid::Gid gid, bool init_if_necessary) { + auto found = cache_.find(gid); + DCHECK(found != cache_.end()) << "Uninitialized remote Vertex/Edge"; + auto &pair = found->second; + if (!pair.second && init_if_necessary) { + pair.second = std::unique_ptr<TRecord>(pair.first->CloneData()); + } + return pair.second.get(); + } + + /** + * For the Vertex/Edge with the given global ID, looks for the data visible + * from the given transaction's ID and command ID, and caches it. Sets the + * given pointers to point to the fetched data. Analogue to + * mvcc::VersionList::find_set_old_new. + */ + void FindSetOldNew(tx::transaction_id_t tx_id, int worker_id, gid::Gid gid, + TRecord *&old_record, TRecord *&new_record) { + std::lock_guard<std::mutex> guard{lock_}; + auto found = cache_.find(gid); + if (found == cache_.end()) { + rec_uptr old_record = + remote_data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid); + found = cache_.emplace( + gid, std::make_pair<rec_uptr, rec_uptr>(nullptr, nullptr)).first; + found->second.first.swap(old_record); + } + + old_record = found->second.first.get(); + new_record = found->second.second.get(); + } + + private: + std::mutex lock_; + distributed::RemoteDataRpcClients &remote_data_clients_; + // TODO it'd be better if we had VertexData and EdgeData in here, as opposed + // to Vertex and Edge. + std::unordered_map<gid::Gid, std::pair<rec_uptr, rec_uptr>> cache_; +}; +} // namespace distributed diff --git a/src/distributed/remote_data_rpc_clients.hpp b/src/distributed/remote_data_rpc_clients.hpp new file mode 100644 index 000000000..0f34114a6 --- /dev/null +++ b/src/distributed/remote_data_rpc_clients.hpp @@ -0,0 +1,85 @@ +#pragma once + +#include <mutex> +#include <utility> + +#include "communication/messaging/distributed.hpp" +#include "communication/rpc/rpc.hpp" +#include "database/state_delta.hpp" +#include "distributed/coordination.hpp" +#include "distributed/remote_data_rpc_messages.hpp" +#include "distributed/remote_data_rpc_messages.hpp" +#include "storage/gid.hpp" +#include "transactions/type.hpp" + +namespace distributed { + +/** Provides access to other worker's data. */ +class RemoteDataRpcClients { + using Client = communication::rpc::Client; + + public: + RemoteDataRpcClients(communication::messaging::System &system, + Coordination &coordination) + : system_(system), coordination_(coordination) {} + + /// Returns a remote worker's data for the given params. That worker must own + /// the vertex for the given id, and that vertex must be visible in given + /// transaction. + std::unique_ptr<Vertex> RemoteVertex(int worker_id, + tx::transaction_id_t tx_id, + gid::Gid gid) { + auto response = RemoteDataClient(worker_id).Call<RemoteVertexRpc>( + kRemoteDataRpcTimeout, TxGidPair{tx_id, gid}); + return std::move(response->name_output_); + } + + /// Returns a remote worker's data for the given params. That worker must own + /// the edge for the given id, and that edge must be visible in given + /// transaction. + std::unique_ptr<Edge> RemoteEdge(int worker_id, tx::transaction_id_t tx_id, + gid::Gid gid) { + auto response = RemoteDataClient(worker_id).Call<RemoteEdgeRpc>( + kRemoteDataRpcTimeout, TxGidPair{tx_id, gid}); + return std::move(response->name_output_); + } + + template <typename TRecord> + std::unique_ptr<TRecord> RemoteElement(int worker_id, + tx::transaction_id_t tx_id, + gid::Gid gid); + + private: + communication::messaging::System &system_; + // TODO make Coordination const, it's member GetEndpoint must be const too. + Coordination &coordination_; + + std::unordered_map<int, Client> clients_; + std::mutex lock_; + + Client &RemoteDataClient(int worker_id) { + std::lock_guard<std::mutex> guard{lock_}; + auto found = clients_.find(worker_id); + if (found != clients_.end()) return found->second; + return clients_ + .emplace( + std::piecewise_construct, std::forward_as_tuple(worker_id), + std::forward_as_tuple(system_, coordination_.GetEndpoint(worker_id), + kRemoteDataRpcName)) + .first->second; + } +}; + +template <> +inline std::unique_ptr<Edge> RemoteDataRpcClients::RemoteElement( + int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { + return RemoteEdge(worker_id, tx_id, gid); +} + +template <> +inline std::unique_ptr<Vertex> RemoteDataRpcClients::RemoteElement( + int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { + return RemoteVertex(worker_id, tx_id, gid); +} + +} // namespace distributed diff --git a/src/distributed/remote_data_rpc_messages.hpp b/src/distributed/remote_data_rpc_messages.hpp new file mode 100644 index 000000000..b3ad39ecf --- /dev/null +++ b/src/distributed/remote_data_rpc_messages.hpp @@ -0,0 +1,74 @@ +#pragma once + +#include <memory> +#include <string> + +#include "communication/messaging/local.hpp" +#include "communication/rpc/rpc.hpp" +#include "distributed/serialization.hpp" +#include "storage/edge.hpp" +#include "storage/gid.hpp" +#include "storage/vertex.hpp" +#include "transactions/type.hpp" +#include "utils/rpc_pimp.hpp" + +namespace distributed { +const std::string kRemoteDataRpcName = "RemoteDataRpc"; +const auto kRemoteDataRpcTimeout = 100ms; + +struct TxGidPair { + tx::transaction_id_t tx_id; + gid::Gid gid; + + private: + friend class boost::serialization::access; + + template <class TArchive> + void serialize(TArchive &ar, unsigned int) { + ar &tx_id; + ar &gid; + } +}; + +#define MAKE_RESPONSE(type, name) \ + class Remote##type##Res : public communication::messaging::Message { \ + public: \ + Remote##type##Res() {} \ + Remote##type##Res(const type *name, int worker_id) \ + : name_input_(name), worker_id_(worker_id) {} \ + \ + template <class TArchive> \ + void save(TArchive &ar, unsigned int) const { \ + ar << boost::serialization::base_object< \ + const communication::messaging::Message>(*this); \ + Save##type(ar, *name_input_, worker_id_); \ + } \ + \ + template <class TArchive> \ + void load(TArchive &ar, unsigned int) { \ + ar >> boost::serialization::base_object< \ + communication::messaging::Message>(*this); \ + auto v = Load##type(ar); \ + v.swap(name_output_); \ + } \ + BOOST_SERIALIZATION_SPLIT_MEMBER() \ + \ + const type *name_input_; \ + int worker_id_; \ + std::unique_ptr<type> name_output_; \ + }; + +MAKE_RESPONSE(Vertex, vertex) +MAKE_RESPONSE(Edge, edge) + +#undef MAKE_RESPONSE + +RPC_SINGLE_MEMBER_MESSAGE(RemoteVertexReq, TxGidPair); +RPC_SINGLE_MEMBER_MESSAGE(RemoteEdgeReq, TxGidPair); + +using RemoteVertexRpc = + communication::rpc::RequestResponse<RemoteVertexReq, RemoteVertexRes>; +using RemoteEdgeRpc = + communication::rpc::RequestResponse<RemoteEdgeReq, RemoteEdgeRes>; + +} // namespace distributed diff --git a/src/distributed/remote_data_rpc_server.hpp b/src/distributed/remote_data_rpc_server.hpp new file mode 100644 index 000000000..0c30cf30e --- /dev/null +++ b/src/distributed/remote_data_rpc_server.hpp @@ -0,0 +1,44 @@ +#pragma once + +#include <memory> + +#include "communication/messaging/distributed.hpp" +#include "communication/rpc/rpc.hpp" +#include "database/graph_db.hpp" +#include "database/graph_db_accessor.hpp" +#include "distributed/remote_data_rpc_messages.hpp" +#include "transactions/type.hpp" + +namespace distributed { + +/** Serves this worker's data to others. */ +class RemoteDataRpcServer { + // TODO maybe reuse GraphDbAccessors. It would reduce the load on tx::Engine + // locks (not sure what the gain would be). But have some way of cache + // invalidation. + public: + RemoteDataRpcServer(database::GraphDb &db, + communication::messaging::System &system) + : db_(db), system_(system) { + rpc_server_.Register<RemoteVertexRpc>([this](const RemoteVertexReq &req) { + database::GraphDbAccessor dba(db_, req.member.tx_id); + auto vertex = dba.FindVertexChecked(req.member.gid, false); + CHECK(vertex.GetOld()) + << "Old record must exist when sending vertex by RPC"; + return std::make_unique<RemoteVertexRes>(vertex.GetOld(), db_.WorkerId()); + }); + + rpc_server_.Register<RemoteEdgeRpc>([this](const RemoteEdgeReq &req) { + database::GraphDbAccessor dba(db_, req.member.tx_id); + auto edge = dba.FindEdgeChecked(req.member.gid, false); + CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC"; + return std::make_unique<RemoteEdgeRes>(edge.GetOld(), db_.WorkerId()); + }); + } + + private: + database::GraphDb &db_; + communication::messaging::System &system_; + communication::rpc::Server rpc_server_{system_, kRemoteDataRpcName}; +}; +} // namespace distributed diff --git a/src/storage/record_accessor.cpp b/src/storage/record_accessor.cpp index 0a0096e8b..1dc8a4783 100644 --- a/src/storage/record_accessor.cpp +++ b/src/storage/record_accessor.cpp @@ -148,7 +148,8 @@ bool RecordAccessor<TRecord>::Reconstruct() const { address_.local()->find_set_old_new(db_accessor_->transaction(), old_, new_); } else { db_accessor().template remote_elements<TRecord>().FindSetOldNew( - db_accessor().transaction(), address_.global_id(), old_, new_); + db_accessor().transaction().id_, address_.worker_id(), + address_.global_id(), old_, new_); } current_ = old_ ? old_ : new_; return old_ != nullptr || new_ != nullptr; @@ -178,8 +179,8 @@ TRecord &RecordAccessor<TRecord>::update() const { new_ = address_.local()->update(t); DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update"; } else { - new_ = db_accessor().template remote_elements<TRecord>().FindNew( - address_.global_id(), true); + // TODO implement + throw std::runtime_error("Not yet implemented"); } return *new_; } diff --git a/src/storage/record_accessor.hpp b/src/storage/record_accessor.hpp index 07281eb0c..faab5f1b1 100644 --- a/src/storage/record_accessor.hpp +++ b/src/storage/record_accessor.hpp @@ -105,6 +105,9 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> { */ RecordAccessor<TRecord> &SwitchNew(); + /** Returns the new record pointer. */ + TRecord *GetNew() const { return new_; } + /** * Attempts to switch this accessor to use the latest version not updated by * the current transaction+command. If that is not possible (vertex/edge was @@ -115,6 +118,9 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> { */ RecordAccessor<TRecord> &SwitchOld(); + /** Returns the old record pointer. */ + TRecord *GetOld() const { return old_; } + /** * Reconstructs the internal state of the record accessor so it uses the * versions appropriate to this transaction+command. diff --git a/src/storage/vertex_accessor.hpp b/src/storage/vertex_accessor.hpp index 6804b5520..a57964193 100644 --- a/src/storage/vertex_accessor.hpp +++ b/src/storage/vertex_accessor.hpp @@ -51,7 +51,7 @@ class VertexAccessor : public RecordAccessor<Vertex> { public: VertexAccessor(VertexAddress address, database::GraphDbAccessor &db_accessor) : RecordAccessor(address, db_accessor) { - RecordAccessor::Reconstruct(); + Reconstruct(); } /** Returns the number of outgoing edges. */ diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index 93cb66fdd..3d58af697 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -61,7 +61,6 @@ tx::Transaction *WorkerEngine::RunningTransaction(tx::transaction_id_t tx_id) { std::move(rpc_client_.Call<SnapshotRpc>(kRpcTimeout, tx_id)->member)); auto insertion = accessor.insert(tx_id, new Transaction(tx_id, snapshot, *this)); - CHECK(insertion.second) << "Transaction already inserted"; utils::EnsureAtomicGe(local_last_, tx_id); return insertion.first->second; } diff --git a/src/transactions/snapshot.hpp b/src/transactions/snapshot.hpp index db8c873d4..03a3dbddf 100644 --- a/src/transactions/snapshot.hpp +++ b/src/transactions/snapshot.hpp @@ -51,8 +51,7 @@ class Snapshot { /** Removes the given transaction id from this Snapshot. * - * @param id - the transaction id to remove - */ + * @param id - the transaction id to remove */ void remove(transaction_id_t id) { auto last = std::remove(transaction_ids_.begin(), transaction_ids_.end(), id); diff --git a/src/utils/serialization.hpp b/src/utils/serialization.hpp index ed497fb99..0dc7f8deb 100644 --- a/src/utils/serialization.hpp +++ b/src/utils/serialization.hpp @@ -1,3 +1,5 @@ +#pragma once + #include <experimental/optional> #include "boost/serialization/split_free.hpp" diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp new file mode 100644 index 000000000..3a1a285f5 --- /dev/null +++ b/tests/unit/distributed_graph_db.cpp @@ -0,0 +1,203 @@ +#include <experimental/optional> +#include <thread> + +#include "gtest/gtest.h" + +#include "communication/messaging/distributed.hpp" +#include "database/graph_db.hpp" +#include "distributed/coordination.hpp" +#include "distributed/coordination_master.hpp" +#include "distributed/coordination_worker.hpp" +#include "distributed/remote_data_rpc_clients.hpp" +#include "distributed/remote_data_rpc_server.hpp" +#include "io/network/endpoint.hpp" +#include "transactions/engine_master.hpp" + +template <typename T> +using optional = std::experimental::optional<T>; + +using namespace distributed; + +class DistributedGraphDbTest : public ::testing::Test { + const std::string kLocal = "127.0.0.1"; + class WorkerInThread { + public: + WorkerInThread(database::Config config) : worker_(config) { + thread_ = std::thread([this, config] { worker_.WaitForShutdown(); }); + } + + ~WorkerInThread() { + if (thread_.joinable()) thread_.join(); + } + + database::Worker worker_; + std::thread thread_; + }; + + protected: + void SetUp() override { + const auto kInitTime = 200ms; + + database::Config master_config; + master_config.master_endpoint = {kLocal, 0}; + master_.emplace(master_config); + std::this_thread::sleep_for(kInitTime); + + auto worker_config = [this](int worker_id) { + database::Config config; + config.worker_id = worker_id; + config.master_endpoint = master_->endpoint(); + config.worker_endpoint = {kLocal, 0}; + return config; + }; + + worker1_.emplace(worker_config(1)); + std::this_thread::sleep_for(kInitTime); + worker2_.emplace(worker_config(2)); + std::this_thread::sleep_for(kInitTime); + } + + void TearDown() override { + // Kill master first because it will expect a shutdown response from the + // workers. + master_ = std::experimental::nullopt; + + worker2_ = std::experimental::nullopt; + worker1_ = std::experimental::nullopt; + } + + database::Master &master() { return *master_; } + auto &master_tx_engine() { + return dynamic_cast<tx::MasterEngine &>(master_->tx_engine()); + } + database::Worker &worker1() { return worker1_->worker_; } + database::Worker &worker2() { return worker2_->worker_; } + + private: + optional<database::Master> master_; + optional<WorkerInThread> worker1_; + optional<WorkerInThread> worker2_; +}; + +TEST_F(DistributedGraphDbTest, Coordination) { + EXPECT_NE(master().endpoint().port(), 0); + EXPECT_NE(worker1().endpoint().port(), 0); + EXPECT_NE(worker2().endpoint().port(), 0); + + EXPECT_EQ(master().GetEndpoint(1), worker1().endpoint()); + EXPECT_EQ(master().GetEndpoint(2), worker2().endpoint()); + EXPECT_EQ(worker1().GetEndpoint(0), master().endpoint()); + EXPECT_EQ(worker1().GetEndpoint(2), worker2().endpoint()); + EXPECT_EQ(worker2().GetEndpoint(0), master().endpoint()); + EXPECT_EQ(worker2().GetEndpoint(1), worker1().endpoint()); +} + +TEST_F(DistributedGraphDbTest, TxEngine) { + auto *tx1 = master_tx_engine().Begin(); + auto *tx2 = master_tx_engine().Begin(); + EXPECT_EQ(tx2->snapshot().size(), 1); + EXPECT_EQ( + worker1().tx_engine().RunningTransaction(tx1->id_)->snapshot().size(), 0); + EXPECT_EQ(worker2().tx_engine().RunningTransaction(tx2->id_)->snapshot(), + tx2->snapshot()); + + ::testing::FLAGS_gtest_death_test_style = "fast"; + EXPECT_DEATH(worker2().tx_engine().RunningTransaction(123), ""); +} + +template <typename TType> +using mapper_vec = + std::vector<std::reference_wrapper<storage::ConcurrentIdMapper<TType>>>; + +TEST_F(DistributedGraphDbTest, StorageTypes) { + auto test_mappers = [](auto mappers, auto ids) { + for (size_t i = 0; i < mappers.size(); ++i) { + ids.emplace_back( + mappers[i].get().value_to_id("value" + std::to_string(i))); + } + EXPECT_GT(ids.size(), 0); + for (size_t i = 0; i < mappers.size(); ++i) { + for (size_t j = 0; j < ids.size(); ++j) { + EXPECT_EQ(mappers[i].get().id_to_value(ids[j]), + "value" + std::to_string(j)); + } + } + }; + + test_mappers(mapper_vec<storage::Label>{master().label_mapper(), + worker1().label_mapper(), + worker2().label_mapper()}, + std::vector<storage::Label>{}); + test_mappers(mapper_vec<storage::EdgeType>{master().edge_type_mapper(), + worker1().edge_type_mapper(), + worker2().edge_type_mapper()}, + std::vector<storage::EdgeType>{}); + test_mappers(mapper_vec<storage::Property>{master().property_mapper(), + worker1().property_mapper(), + worker2().property_mapper()}, + std::vector<storage::Property>{}); +} + +TEST_F(DistributedGraphDbTest, Counters) { + EXPECT_EQ(master().counters().Get("a"), 0); + EXPECT_EQ(worker1().counters().Get("a"), 1); + EXPECT_EQ(worker2().counters().Get("a"), 2); + + EXPECT_EQ(worker1().counters().Get("b"), 0); + EXPECT_EQ(worker2().counters().Get("b"), 1); + EXPECT_EQ(master().counters().Get("b"), 2); +} + +TEST_F(DistributedGraphDbTest, RemoteDataGetting) { + using GraphDbAccessor = database::GraphDbAccessor; + // Only old data is visible remotely, so create and commit some data. + gid::Gid v1_id, v2_id, e1_id; + + { + GraphDbAccessor dba{master()}; + auto v1 = dba.InsertVertex(); + auto v2 = dba.InsertVertex(); + auto e1 = dba.InsertEdge(v1, v2, dba.EdgeType("et")); + + // Set some data so we see we're getting the right stuff. + v1.PropsSet(dba.Property("p1"), 42); + v1.add_label(dba.Label("label")); + v2.PropsSet(dba.Property("p2"), "value"); + e1.PropsSet(dba.Property("p3"), true); + + v1_id = v1.gid(); + v2_id = v2.gid(); + e1_id = e1.gid(); + + dba.Commit(); + } + + // The master must start a transaction before workers can work in it. + database::GraphDbAccessor master_dba{master()}; + + { + database::GraphDbAccessor w1_dba{worker1(), master_dba.transaction_id()}; + VertexAccessor v1_in_w1{{v1_id, 0}, w1_dba}; + EXPECT_NE(v1_in_w1.GetOld(), nullptr); + EXPECT_EQ(v1_in_w1.GetNew(), nullptr); + EXPECT_EQ(v1_in_w1.PropsAt(w1_dba.Property("p1")).Value<int64_t>(), 42); + EXPECT_TRUE(v1_in_w1.has_label(w1_dba.Label("label"))); + } + + { + database::GraphDbAccessor w2_dba{worker2(), master_dba.transaction_id()}; + VertexAccessor v2_in_w2{{v2_id, 0}, w2_dba}; + EXPECT_NE(v2_in_w2.GetOld(), nullptr); + EXPECT_EQ(v2_in_w2.GetNew(), nullptr); + EXPECT_EQ(v2_in_w2.PropsAt(w2_dba.Property("p2")).Value<std::string>(), + "value"); + EXPECT_FALSE(v2_in_w2.has_label(w2_dba.Label("label"))); + + VertexAccessor v1_in_w2{{v1_id, 0}, w2_dba}; + EdgeAccessor e1_in_w2{{e1_id, 0}, w2_dba}; + EXPECT_EQ(e1_in_w2.from(), v1_in_w2); + EXPECT_EQ(e1_in_w2.to(), v2_in_w2); + EXPECT_EQ(e1_in_w2.EdgeType(), w2_dba.EdgeType("et")); + EXPECT_EQ(e1_in_w2.PropsAt(w2_dba.Property("p3")).Value<bool>(), true); + } +}