Split GraphDb to distributed and single node files

Summary:
This change, hopefully, simplifies the implementation of different kinds
of GraphDb. The pimpl idiom is now simplified by removing all of the
crazy inheritance. Implementations classes are just plain data stores,
without any methods. The interface classes now have a more flat
hierarchy:

```
    GraphDb (pure interface)
         |
    +----+---------- DistributedGraphDb (pure interface)
    |                         |
Single Node             +-----+------+
                        |            |
                      Master       Worker
```

DistributedGraphDb is used as an intermediate interface for all the
things that should work only in distributed. Therefore, virtual calls
for distributed stuff have been removed from GraphDb. Some are exposed
via DistributedGraphDb, other's are only in concrete Master and Worker
classes. The code which relied on those virtual calls has been
refactored to either use DistributedGraphDb, take a pointer to what is
actually needed or use dynamic_cast. Obviously, dynamic_cast is a
temporary solution and should be replaced with another mechanism (e.g.
virtual call, or some other function pointer style).

The cost of the above change is some code duplication in constructors
and destructors of classes. This duplication has a lot of little tweaks
that make it hard to generalize, not to mention that virtual calls do
not work in constructor and destructor. If we really care about
generalizing this, we should think about abandoning RAII in favor of
constructor + Init method.

The next steps for splitting the dependencies that seem logical are:

  1) Split GraphDbAccessor implementation, either via inheritance or
     passing in an implementation pointer. GraphDbAccessor should then
     only be created by a virtual call on GraphDb.
  2) Split Interpreter implementation. Besides allowing single node
     interpreter to exist without depending on distributed, this will
     enable the planner and operators to be correctly separated.

Reviewers: msantl, mferencevic, ipaljak

Reviewed By: msantl

Subscribers: dgleich, pullbot

Differential Revision: https://phabricator.memgraph.io/D1493
This commit is contained in:
Teon Banek 2018-07-19 17:00:50 +02:00
parent 4a3808b982
commit 2c50ea41d5
31 changed files with 1061 additions and 782 deletions

View File

@ -13,6 +13,7 @@ set(memgraph_src_files
data_structures/concurrent/skiplist_gc.cpp
database/config.cpp
database/counters.cpp
database/distributed_graph_db.cpp
database/graph_db.cpp
database/graph_db_accessor.cpp
database/state_delta.cpp

View File

@ -0,0 +1,495 @@
#include "database/distributed_graph_db.hpp"
#include "database/storage_gc_master.hpp"
#include "database/storage_gc_worker.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "distributed/bfs_rpc_server.hpp"
#include "distributed/bfs_subcursor.hpp"
#include "distributed/cluster_discovery_master.hpp"
#include "distributed/cluster_discovery_worker.hpp"
#include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp"
#include "distributed/data_manager.hpp"
#include "distributed/data_rpc_server.hpp"
#include "distributed/durability_rpc_clients.hpp"
#include "distributed/durability_rpc_server.hpp"
#include "distributed/index_rpc_server.hpp"
#include "distributed/plan_dispatcher.hpp"
#include "distributed/pull_rpc_clients.hpp"
#include "distributed/token_sharing_rpc_server.hpp"
#include "distributed/transactional_cache_cleaner.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "distributed/updates_rpc_server.hpp"
#include "durability/snapshooter.hpp"
#include "storage/concurrent_id_mapper.hpp"
#include "storage/concurrent_id_mapper_master.hpp"
#include "storage/concurrent_id_mapper_worker.hpp"
#include "transactions/engine_master.hpp"
#include "utils/file.hpp"
using namespace std::literals::chrono_literals;
namespace database {
namespace impl {
template <template <typename TId> class TMapper>
struct TypemapPack {
template <typename... TMapperArgs>
explicit TypemapPack(TMapperArgs &... args)
: label(args...), edge_type(args...), property(args...) {}
// TODO this should also be garbage collected
TMapper<storage::Label> label;
TMapper<storage::EdgeType> edge_type;
TMapper<storage::Property> property;
};
// Master
class Master {
public:
explicit Master(const Config &config, DistributedGraphDb *self)
: config_(config), self_(self) {}
Config config_;
std::unique_ptr<Storage> storage_ =
std::make_unique<Storage>(config_.worker_id, config_.properties_on_disk);
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled};
// TODO: Some things may depend on order of construction/destruction. We also
// have a lot of circular pointers among members. It would be a good idea to
// clean the mess. Also, be careful of virtual calls to `self_` in
// constructors of members.
DistributedGraphDb *self_{nullptr};
communication::rpc::Server server_{
config_.master_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
tx::MasterEngine tx_engine_{server_, rpc_worker_clients_, &wal_};
distributed::MasterCoordination coordination_{server_.endpoint()};
std::unique_ptr<StorageGcMaster> storage_gc_ =
std::make_unique<StorageGcMaster>(
*storage_, tx_engine_, config_.gc_cycle_sec, server_, coordination_);
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
TypemapPack<storage::MasterConcurrentIdMapper> typemap_pack_{server_};
database::MasterCounters counters_{server_};
distributed::BfsSubcursorStorage subcursor_storage_{self_,
&bfs_subcursor_clients_};
distributed::BfsRpcServer bfs_subcursor_server_{self_, &server_,
&subcursor_storage_};
distributed::BfsRpcClients bfs_subcursor_clients_{
self_, &subcursor_storage_, &rpc_worker_clients_, &data_manager_};
distributed::DurabilityRpcClients durability_rpc_clients_{
rpc_worker_clients_};
distributed::DataRpcServer data_server_{*self_, server_};
distributed::DataRpcClients data_clients_{rpc_worker_clients_};
distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_};
distributed::PullRpcClients pull_clients_{rpc_worker_clients_};
distributed::IndexRpcClients index_rpc_clients_{rpc_worker_clients_};
distributed::UpdatesRpcServer updates_server_{*self_, server_};
distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
distributed::DataManager data_manager_{*self_, data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{
tx_engine_, updates_server_, data_manager_};
distributed::ClusterDiscoveryMaster cluster_discovery_{server_, coordination_,
rpc_worker_clients_};
distributed::TokenSharingRpcClients token_sharing_clients_{
&rpc_worker_clients_};
distributed::TokenSharingRpcServer token_sharing_server_{
self_, config_.worker_id, &coordination_, &server_,
&token_sharing_clients_};
};
} // namespace impl
Master::Master(Config config)
: impl_(std::make_unique<impl::Master>(config, this)) {
if (impl_->config_.durability_enabled)
utils::CheckDir(impl_->config_.durability_directory);
// Durability recovery.
{
// What we recover.
std::experimental::optional<durability::RecoveryInfo> recovery_info;
// Recover only if necessary.
if (impl_->config_.db_recover_on_startup) {
recovery_info = durability::Recover(impl_->config_.durability_directory,
*this, std::experimental::nullopt);
}
// Post-recovery setup and checking.
impl_->coordination_.SetRecoveryInfo(recovery_info);
if (recovery_info) {
CHECK(impl_->config_.recovering_cluster_size > 0)
<< "Invalid cluster recovery size flag. Recovered cluster size "
"should be at least 1";
while (impl_->coordination_.CountRecoveredWorkers() !=
impl_->config_.recovering_cluster_size - 1) {
LOG(INFO) << "Waiting for workers to finish recovering..";
std::this_thread::sleep_for(2s);
}
}
}
// Start the dynamic graph partitioner inside token sharing server
if (impl_->config_.dynamic_graph_partitioner_enabled) {
impl_->token_sharing_server_.StartTokenSharing();
}
if (impl_->config_.durability_enabled) {
impl_->wal_.Enable();
snapshot_creator_ = std::make_unique<utils::Scheduler>();
snapshot_creator_->Run(
"Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
[this] {
GraphDbAccessor dba(*this);
MakeSnapshot(dba);
});
}
// Start transaction killer.
if (impl_->config_.query_execution_time_sec != -1) {
transaction_killer_.Run(
"TX killer",
std::chrono::seconds(std::max(
1, std::min(5, impl_->config_.query_execution_time_sec / 4))),
[this]() {
impl_->tx_engine_.LocalForEachActiveTransaction(
[this](tx::Transaction &t) {
if (t.creation_time() +
std::chrono::seconds(
impl_->config_.query_execution_time_sec) <
std::chrono::steady_clock::now()) {
t.set_should_abort();
};
});
});
}
}
Master::~Master() {
snapshot_creator_ = nullptr;
is_accepting_transactions_ = false;
impl_->tx_engine_.LocalForEachActiveTransaction(
[](auto &t) { t.set_should_abort(); });
// We are not a worker, so we can do a snapshot on exit if it's enabled. Doing
// this on the master forces workers to do the same through rpcs
if (impl_->config_.snapshot_on_exit) {
GraphDbAccessor dba(*this);
MakeSnapshot(dba);
}
}
Storage &Master::storage() { return *impl_->storage_; }
durability::WriteAheadLog &Master::wal() { return impl_->wal_; }
tx::Engine &Master::tx_engine() { return impl_->tx_engine_; }
storage::ConcurrentIdMapper<storage::Label> &Master::label_mapper() {
return impl_->typemap_pack_.label;
}
storage::ConcurrentIdMapper<storage::EdgeType> &Master::edge_type_mapper() {
return impl_->typemap_pack_.edge_type;
}
storage::ConcurrentIdMapper<storage::Property> &Master::property_mapper() {
return impl_->typemap_pack_.property;
}
database::Counters &Master::counters() { return impl_->counters_; }
void Master::CollectGarbage() { impl_->storage_gc_->CollectGarbage(); }
int Master::WorkerId() const { return impl_->config_.worker_id; }
std::vector<int> Master::GetWorkerIds() const {
return impl_->coordination_.GetWorkerIds();
}
// Makes a local snapshot and forces the workers to do the same. Snapshot is
// written here only if workers sucesfully created their own snapshot
bool Master::MakeSnapshot(GraphDbAccessor &accessor) {
auto workers_snapshot =
impl_->durability_rpc_clients_.MakeSnapshot(accessor.transaction_id());
if (!workers_snapshot.get()) return false;
// This can be further optimized by creating master snapshot at the same
// time as workers snapshots but this forces us to delete the master
// snapshot if we succeed in creating it and workers somehow fail. Because
// we have an assumption that every snapshot that exists on master with
// some tx_id visibility also exists on workers
const bool status = durability::MakeSnapshot(
*this, accessor, fs::path(impl_->config_.durability_directory),
impl_->config_.snapshot_max_retained);
if (status) {
LOG(INFO) << "Snapshot created successfully.";
} else {
LOG(ERROR) << "Snapshot creation failed!";
}
return status;
}
void Master::ReinitializeStorage() {
// Release gc scheduler to stop it from touching storage
impl_->storage_gc_ = nullptr;
impl_->storage_ = std::make_unique<Storage>(
impl_->config_.worker_id, impl_->config_.properties_on_disk);
impl_->storage_gc_ = std::make_unique<StorageGcMaster>(
*impl_->storage_, impl_->tx_engine_, impl_->config_.gc_cycle_sec,
impl_->server_, impl_->coordination_);
}
io::network::Endpoint Master::endpoint() const {
return impl_->server_.endpoint();
}
io::network::Endpoint Master::GetEndpoint(int worker_id) {
return impl_->coordination_.GetEndpoint(worker_id);
}
distributed::BfsRpcClients &Master::bfs_subcursor_clients() {
return impl_->bfs_subcursor_clients_;
}
distributed::DataRpcClients &Master::data_clients() {
return impl_->data_clients_;
}
distributed::UpdatesRpcServer &Master::updates_server() {
return impl_->updates_server_;
}
distributed::UpdatesRpcClients &Master::updates_clients() {
return impl_->updates_clients_;
}
distributed::DataManager &Master::data_manager() {
return impl_->data_manager_;
}
distributed::PullRpcClients &Master::pull_clients() {
return impl_->pull_clients_;
}
distributed::PlanDispatcher &Master::plan_dispatcher() {
return impl_->plan_dispatcher_;
}
distributed::IndexRpcClients &Master::index_rpc_clients() {
return impl_->index_rpc_clients_;
}
// Worker
namespace impl {
class Worker {
public:
Config config_;
std::unique_ptr<Storage> storage_ =
std::make_unique<Storage>(config_.worker_id, config_.properties_on_disk);
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled};
explicit Worker(const Config &config, DistributedGraphDb *self)
: config_(config), self_(self) {
cluster_discovery_.RegisterWorker(config.worker_id);
}
// TODO: Some things may depend on order of construction/destruction. We also
// have a lot of circular pointers among members. It would be a good idea to
// clean the mess. Also, be careful of virtual calls to `self_` in
// constructors of members.
DistributedGraphDb *self_{nullptr};
communication::rpc::Server server_{
config_.worker_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
distributed::WorkerCoordination coordination_{server_,
config_.master_endpoint};
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
tx::WorkerEngine tx_engine_{rpc_worker_clients_.GetClientPool(0)};
std::unique_ptr<StorageGcWorker> storage_gc_ =
std::make_unique<StorageGcWorker>(
*storage_, tx_engine_, config_.gc_cycle_sec,
rpc_worker_clients_.GetClientPool(0), config_.worker_id);
TypemapPack<storage::WorkerConcurrentIdMapper> typemap_pack_{
rpc_worker_clients_.GetClientPool(0)};
database::WorkerCounters counters_{rpc_worker_clients_.GetClientPool(0)};
distributed::BfsSubcursorStorage subcursor_storage_{self_,
&bfs_subcursor_clients_};
distributed::BfsRpcServer bfs_subcursor_server_{self_, &server_,
&subcursor_storage_};
distributed::BfsRpcClients bfs_subcursor_clients_{
self_, &subcursor_storage_, &rpc_worker_clients_, &data_manager_};
distributed::DataRpcServer data_server_{*self_, server_};
distributed::DataRpcClients data_clients_{rpc_worker_clients_};
distributed::PlanConsumer plan_consumer_{server_};
distributed::ProduceRpcServer produce_server_{*self_, tx_engine_, server_,
plan_consumer_, &data_manager_};
distributed::IndexRpcServer index_rpc_server_{*self_, server_};
distributed::UpdatesRpcServer updates_server_{*self_, server_};
distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
distributed::DataManager data_manager_{*self_, data_clients_};
distributed::WorkerTransactionalCacheCleaner cache_cleaner_{
tx_engine_, &wal_, server_,
produce_server_, updates_server_, data_manager_};
distributed::DurabilityRpcServer durability_rpc_server_{*self_, server_};
distributed::ClusterDiscoveryWorker cluster_discovery_{
server_, coordination_, rpc_worker_clients_.GetClientPool(0)};
distributed::TokenSharingRpcClients token_sharing_clients_{
&rpc_worker_clients_};
distributed::TokenSharingRpcServer token_sharing_server_{
self_, config_.worker_id, &coordination_, &server_,
&token_sharing_clients_};
};
} // namespace impl
Worker::Worker(Config config)
: impl_(std::make_unique<impl::Worker>(config, this)) {
if (impl_->config_.durability_enabled)
utils::CheckDir(impl_->config_.durability_directory);
// Durability recovery.
{
// What we should recover.
std::experimental::optional<durability::RecoveryInfo>
required_recovery_info(impl_->cluster_discovery_.recovery_info());
// What we recover.
std::experimental::optional<durability::RecoveryInfo> recovery_info;
// Recover only if necessary.
if (required_recovery_info) {
recovery_info = durability::Recover(impl_->config_.durability_directory,
*this, required_recovery_info);
}
// Post-recovery setup and checking.
if (required_recovery_info != recovery_info)
LOG(FATAL) << "Memgraph worker failed to recover the database state "
"recovered on the master";
impl_->cluster_discovery_.NotifyWorkerRecovered();
}
if (impl_->config_.durability_enabled) {
impl_->wal_.Enable();
}
// Start transaction killer.
if (impl_->config_.query_execution_time_sec != -1) {
transaction_killer_.Run(
"TX killer",
std::chrono::seconds(std::max(
1, std::min(5, impl_->config_.query_execution_time_sec / 4))),
[this]() {
impl_->tx_engine_.LocalForEachActiveTransaction(
[this](tx::Transaction &t) {
if (t.creation_time() +
std::chrono::seconds(
impl_->config_.query_execution_time_sec) <
std::chrono::steady_clock::now()) {
t.set_should_abort();
};
});
});
}
}
Worker::~Worker() {
is_accepting_transactions_ = false;
impl_->tx_engine_.LocalForEachActiveTransaction(
[](auto &t) { t.set_should_abort(); });
}
Storage &Worker::storage() { return *impl_->storage_; }
durability::WriteAheadLog &Worker::wal() { return impl_->wal_; }
tx::Engine &Worker::tx_engine() { return impl_->tx_engine_; }
storage::ConcurrentIdMapper<storage::Label> &Worker::label_mapper() {
return impl_->typemap_pack_.label;
}
storage::ConcurrentIdMapper<storage::EdgeType> &Worker::edge_type_mapper() {
return impl_->typemap_pack_.edge_type;
}
storage::ConcurrentIdMapper<storage::Property> &Worker::property_mapper() {
return impl_->typemap_pack_.property;
}
database::Counters &Worker::counters() { return impl_->counters_; }
void Worker::CollectGarbage() { return impl_->storage_gc_->CollectGarbage(); }
int Worker::WorkerId() const { return impl_->config_.worker_id; }
std::vector<int> Worker::GetWorkerIds() const {
return impl_->coordination_.GetWorkerIds();
}
bool Worker::MakeSnapshot(GraphDbAccessor &accessor) {
// Makes a local snapshot from the visibility of accessor
const bool status = durability::MakeSnapshot(
*this, accessor, fs::path(impl_->config_.durability_directory),
impl_->config_.snapshot_max_retained);
if (status) {
LOG(INFO) << "Snapshot created successfully.";
} else {
LOG(ERROR) << "Snapshot creation failed!";
}
return status;
}
void Worker::ReinitializeStorage() {
// Release gc scheduler to stop it from touching storage
impl_->storage_gc_ = nullptr;
impl_->storage_ = std::make_unique<Storage>(
impl_->config_.worker_id, impl_->config_.properties_on_disk);
impl_->storage_gc_ = std::make_unique<StorageGcWorker>(
*impl_->storage_, impl_->tx_engine_, impl_->config_.gc_cycle_sec,
impl_->rpc_worker_clients_.GetClientPool(0), impl_->config_.worker_id);
}
io::network::Endpoint Worker::endpoint() const {
return impl_->server_.endpoint();
}
io::network::Endpoint Worker::GetEndpoint(int worker_id) {
return impl_->coordination_.GetEndpoint(worker_id);
}
void Worker::WaitForShutdown() {
return impl_->coordination_.WaitForShutdown();
}
distributed::BfsRpcClients &Worker::bfs_subcursor_clients() {
return impl_->bfs_subcursor_clients_;
}
distributed::DataRpcClients &Worker::data_clients() {
return impl_->data_clients_;
}
distributed::UpdatesRpcServer &Worker::updates_server() {
return impl_->updates_server_;
}
distributed::UpdatesRpcClients &Worker::updates_clients() {
return impl_->updates_clients_;
}
distributed::DataManager &Worker::data_manager() {
return impl_->data_manager_;
}
distributed::PlanConsumer &Worker::plan_consumer() {
return impl_->plan_consumer_;
}
} // namespace database

View File

@ -0,0 +1,125 @@
#pragma once
#include "database/graph_db.hpp"
namespace distributed {
class BfsRpcServer;
class BfsRpcClients;
class DataRpcServer;
class DataRpcClients;
class PlanDispatcher;
class PlanConsumer;
class PullRpcClients;
class ProduceRpcServer;
class UpdatesRpcServer;
class UpdatesRpcClients;
class DataManager;
class IndexRpcClients;
} // namespace distributed
namespace database {
namespace impl {
class Master;
class Worker;
} // namespace impl
/// Abstract base class for concrete distributed versions of GraphDb
class DistributedGraphDb : public GraphDb {
public:
virtual distributed::BfsRpcClients &bfs_subcursor_clients() = 0;
virtual distributed::DataRpcClients &data_clients() = 0;
virtual distributed::UpdatesRpcServer &updates_server() = 0;
virtual distributed::UpdatesRpcClients &updates_clients() = 0;
virtual distributed::DataManager &data_manager() = 0;
};
class Master final : public DistributedGraphDb {
public:
explicit Master(Config config = Config());
~Master();
GraphDb::Type type() const override {
return GraphDb::Type::DISTRIBUTED_MASTER;
}
Storage &storage() override;
durability::WriteAheadLog &wal() override;
tx::Engine &tx_engine() override;
storage::ConcurrentIdMapper<storage::Label> &label_mapper() override;
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override;
storage::ConcurrentIdMapper<storage::Property> &property_mapper() override;
database::Counters &counters() override;
void CollectGarbage() override;
int WorkerId() const override;
std::vector<int> GetWorkerIds() const override;
bool MakeSnapshot(GraphDbAccessor &accessor) override;
void ReinitializeStorage() override;
/** Gets this master's endpoint. */
io::network::Endpoint endpoint() const;
/** Gets the endpoint of the worker with the given id. */
// TODO make const once Coordination::GetEndpoint is const.
io::network::Endpoint GetEndpoint(int worker_id);
distributed::BfsRpcClients &bfs_subcursor_clients() override;
distributed::DataRpcClients &data_clients() override;
distributed::UpdatesRpcServer &updates_server() override;
distributed::UpdatesRpcClients &updates_clients() override;
distributed::DataManager &data_manager() override;
distributed::PullRpcClients &pull_clients();
distributed::PlanDispatcher &plan_dispatcher();
distributed::IndexRpcClients &index_rpc_clients();
private:
std::unique_ptr<impl::Master> impl_;
utils::Scheduler transaction_killer_;
std::unique_ptr<utils::Scheduler> snapshot_creator_;
};
class Worker final : public DistributedGraphDb {
public:
explicit Worker(Config config = Config());
~Worker();
GraphDb::Type type() const override {
return GraphDb::Type::DISTRIBUTED_WORKER;
}
Storage &storage() override;
durability::WriteAheadLog &wal() override;
tx::Engine &tx_engine() override;
storage::ConcurrentIdMapper<storage::Label> &label_mapper() override;
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override;
storage::ConcurrentIdMapper<storage::Property> &property_mapper() override;
database::Counters &counters() override;
void CollectGarbage() override;
int WorkerId() const override;
std::vector<int> GetWorkerIds() const override;
bool MakeSnapshot(GraphDbAccessor &accessor) override;
void ReinitializeStorage() override;
/** Gets this worker's endpoint. */
io::network::Endpoint endpoint() const;
/** Gets the endpoint of the worker with the given id. */
// TODO make const once Coordination::GetEndpoint is const.
io::network::Endpoint GetEndpoint(int worker_id);
void WaitForShutdown();
distributed::BfsRpcClients &bfs_subcursor_clients() override;
distributed::DataRpcClients &data_clients() override;
distributed::UpdatesRpcServer &updates_server() override;
distributed::UpdatesRpcClients &updates_clients() override;
distributed::DataManager &data_manager() override;
distributed::PlanConsumer &plan_consumer();
private:
std::unique_ptr<impl::Worker> impl_;
utils::Scheduler transaction_killer_;
};
} // namespace database

View File

@ -1,411 +1,74 @@
#include "database/graph_db.hpp"
#include <experimental/optional>
#include "glog/logging.h"
#include <glog/logging.h>
#include "communication/rpc/server.hpp"
#include "database/graph_db.hpp"
#include "database/storage_gc_master.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/storage_gc_single_node.hpp"
#include "database/storage_gc_worker.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "distributed/bfs_rpc_server.hpp"
#include "distributed/cluster_discovery_master.hpp"
#include "distributed/cluster_discovery_worker.hpp"
#include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp"
#include "distributed/data_manager.hpp"
#include "distributed/data_rpc_clients.hpp"
#include "distributed/data_rpc_server.hpp"
#include "distributed/durability_rpc_clients.hpp"
#include "distributed/durability_rpc_messages.hpp"
#include "distributed/durability_rpc_server.hpp"
#include "distributed/index_rpc_server.hpp"
#include "distributed/plan_consumer.hpp"
#include "distributed/plan_dispatcher.hpp"
#include "distributed/produce_rpc_server.hpp"
#include "distributed/pull_rpc_clients.hpp"
#include "distributed/token_sharing_rpc_server.hpp"
#include "distributed/transactional_cache_cleaner.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "distributed/updates_rpc_server.hpp"
#include "durability/paths.hpp"
#include "durability/recovery.hpp"
#include "durability/snapshooter.hpp"
#include "storage/concurrent_id_mapper_master.hpp"
#include "storage/concurrent_id_mapper_single_node.hpp"
#include "storage/concurrent_id_mapper_worker.hpp"
#include "transactions/engine_master.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/engine_worker.hpp"
#include "utils/file.hpp"
#include "utils/flag_validation.hpp"
using namespace std::literals::chrono_literals;
using namespace storage;
namespace database {
namespace impl {
class PrivateBase : public GraphDb {
public:
explicit PrivateBase(const Config &config) : config_(config) {}
virtual ~PrivateBase() {}
const Config config_;
Storage &storage() override { return *storage_; }
durability::WriteAheadLog &wal() override { return wal_; }
int WorkerId() const override { return config_.worker_id; }
// Makes a local snapshot from the visibility of accessor
bool MakeSnapshot(GraphDbAccessor &accessor) override {
const bool status = durability::MakeSnapshot(
*this, accessor, fs::path(config_.durability_directory),
config_.snapshot_max_retained);
if (status) {
LOG(INFO) << "Snapshot created successfully." << std::endl;
} else {
LOG(ERROR) << "Snapshot creation failed!" << std::endl;
}
return status;
}
void ReinitializeStorage() override {
storage_ =
std::make_unique<Storage>(WorkerId(), config_.properties_on_disk);
}
distributed::PullRpcClients &pull_clients() override {
LOG(FATAL) << "Remote pull clients only available in master.";
}
distributed::ProduceRpcServer &produce_server() override {
LOG(FATAL) << "Remote produce server only available in worker.";
}
distributed::PlanConsumer &plan_consumer() override {
LOG(FATAL) << "Plan consumer only available in distributed worker.";
}
distributed::PlanDispatcher &plan_dispatcher() override {
LOG(FATAL) << "Plan dispatcher only available in distributed master.";
}
distributed::IndexRpcClients &index_rpc_clients() override {
LOG(FATAL) << "Index RPC clients only available in distributed master.";
}
protected:
std::unique_ptr<Storage> storage_ =
std::make_unique<Storage>(config_.worker_id, config_.properties_on_disk);
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled};
};
template <template <typename TId> class TMapper>
struct TypemapPack {
template <typename... TMapperArgs>
explicit TypemapPack(TMapperArgs &... args)
: label(args...), edge_type(args...), property(args...) {}
// TODO this should also be garbage collected
TMapper<Label> label;
TMapper<EdgeType> edge_type;
TMapper<Property> property;
TMapper<storage::Label> label;
TMapper<storage::EdgeType> edge_type;
TMapper<storage::Property> property;
};
#define IMPL_GETTERS \
tx::Engine &tx_engine() override { return tx_engine_; } \
ConcurrentIdMapper<Label> &label_mapper() override { \
return typemap_pack_.label; \
} \
ConcurrentIdMapper<EdgeType> &edge_type_mapper() override { \
return typemap_pack_.edge_type; \
} \
ConcurrentIdMapper<Property> &property_mapper() override { \
return typemap_pack_.property; \
} \
database::Counters &counters() override { return counters_; } \
void CollectGarbage() override { storage_gc_->CollectGarbage(); }
class SingleNode : public PrivateBase {
class SingleNode {
public:
explicit SingleNode(const Config &config) : PrivateBase(config) {}
GraphDb::Type type() const override { return GraphDb::Type::SINGLE_NODE; }
IMPL_GETTERS
explicit SingleNode(const Config &config) : config_(config) {}
Config config_;
std::unique_ptr<Storage> storage_ =
std::make_unique<Storage>(config_.worker_id, config_.properties_on_disk);
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled};
tx::SingleNodeEngine tx_engine_{&wal_};
std::unique_ptr<StorageGcSingleNode> storage_gc_ =
std::make_unique<StorageGcSingleNode>(*storage_, tx_engine_,
config_.gc_cycle_sec);
TypemapPack<SingleNodeConcurrentIdMapper> typemap_pack_{
TypemapPack<storage::SingleNodeConcurrentIdMapper> typemap_pack_{
storage_->PropertiesOnDisk()};
database::SingleNodeCounters counters_;
std::vector<int> GetWorkerIds() const override { return {0}; }
distributed::BfsRpcServer &bfs_subcursor_server() override {
LOG(FATAL) << "Subcursor server not available in single-node.";
}
distributed::BfsRpcClients &bfs_subcursor_clients() override {
LOG(FATAL) << "Subcursor clients not available in single-node.";
}
distributed::DataRpcServer &data_server() override {
LOG(FATAL) << "Remote data server not available in single-node.";
}
distributed::DataRpcClients &data_clients() override {
LOG(FATAL) << "Remote data clients not available in single-node.";
}
distributed::PlanDispatcher &plan_dispatcher() override {
LOG(FATAL) << "Plan Dispatcher not available in single-node.";
}
distributed::PlanConsumer &plan_consumer() override {
LOG(FATAL) << "Plan Consumer not available in single-node.";
}
distributed::UpdatesRpcServer &updates_server() override {
LOG(FATAL) << "Remote updates server not available in single-node.";
}
distributed::UpdatesRpcClients &updates_clients() override {
LOG(FATAL) << "Remote updates clients not available in single-node.";
}
distributed::DataManager &data_manager() override {
LOG(FATAL) << "Remote data manager not available in single-node.";
}
void ReinitializeStorage() override {
// Release gc scheduler to stop it from touching storage
storage_gc_ = nullptr;
PrivateBase::ReinitializeStorage();
storage_gc_ = std::make_unique<StorageGcSingleNode>(*storage_, tx_engine_,
config_.gc_cycle_sec);
}
};
#define IMPL_DISTRIBUTED_GETTERS \
std::vector<int> GetWorkerIds() const override { \
return coordination_.GetWorkerIds(); \
} \
distributed::BfsRpcServer &bfs_subcursor_server() override { \
return bfs_subcursor_server_; \
} \
distributed::BfsRpcClients &bfs_subcursor_clients() override { \
return bfs_subcursor_clients_; \
} \
distributed::DataRpcServer &data_server() override { return data_server_; } \
distributed::DataRpcClients &data_clients() override { \
return data_clients_; \
} \
distributed::UpdatesRpcServer &updates_server() override { \
return updates_server_; \
} \
distributed::UpdatesRpcClients &updates_clients() override { \
return updates_clients_; \
} \
distributed::DataManager &data_manager() override { return data_manager_; }
} // namespace impl
class Master : public PrivateBase {
public:
explicit Master(const Config &config) : PrivateBase(config) {}
GraphDb::Type type() const override {
return GraphDb::Type::DISTRIBUTED_MASTER;
}
// Makes a local snapshot and forces the workers to do the same. Snapshot is
// written here only if workers sucesfully created their own snapshot
bool MakeSnapshot(GraphDbAccessor &accessor) override {
auto workers_snapshot =
durability_rpc_clients_.MakeSnapshot(accessor.transaction_id());
if (!workers_snapshot.get()) return false;
// This can be further optimized by creating master snapshot at the same
// time as workers snapshots but this forces us to delete the master
// snapshot if we succeed in creating it and workers somehow fail. Because
// we have an assumption that every snapshot that exists on master with some
// tx_id visibility also exists on workers
return PrivateBase::MakeSnapshot(accessor);
}
IMPL_GETTERS
IMPL_DISTRIBUTED_GETTERS
distributed::PlanDispatcher &plan_dispatcher() override {
return plan_dispatcher_;
}
distributed::PullRpcClients &pull_clients() override { return pull_clients_; }
distributed::IndexRpcClients &index_rpc_clients() override {
return index_rpc_clients_;
}
void ReinitializeStorage() override {
// Release gc scheduler to stop it from touching storage
storage_gc_ = nullptr;
PrivateBase::ReinitializeStorage();
storage_gc_ = std::make_unique<StorageGcMaster>(
*storage_, tx_engine_, config_.gc_cycle_sec, server_, coordination_);
}
communication::rpc::Server server_{
config_.master_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
tx::MasterEngine tx_engine_{server_, rpc_worker_clients_, &wal_};
distributed::MasterCoordination coordination_{server_.endpoint()};
std::unique_ptr<StorageGcMaster> storage_gc_ =
std::make_unique<StorageGcMaster>(
*storage_, tx_engine_, config_.gc_cycle_sec, server_, coordination_);
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
TypemapPack<MasterConcurrentIdMapper> typemap_pack_{server_};
database::MasterCounters counters_{server_};
distributed::BfsSubcursorStorage subcursor_storage_{this};
distributed::BfsRpcServer bfs_subcursor_server_{this, &server_,
&subcursor_storage_};
distributed::BfsRpcClients bfs_subcursor_clients_{this, &subcursor_storage_,
&rpc_worker_clients_};
distributed::DurabilityRpcClients durability_rpc_clients_{
rpc_worker_clients_};
distributed::DataRpcServer data_server_{*this, server_};
distributed::DataRpcClients data_clients_{rpc_worker_clients_};
distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_};
distributed::PullRpcClients pull_clients_{rpc_worker_clients_};
distributed::IndexRpcClients index_rpc_clients_{rpc_worker_clients_};
distributed::UpdatesRpcServer updates_server_{*this, server_};
distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
distributed::DataManager data_manager_{*this, data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{
tx_engine_, updates_server_, data_manager_};
distributed::ClusterDiscoveryMaster cluster_discovery_{server_, coordination_,
rpc_worker_clients_};
distributed::TokenSharingRpcClients token_sharing_clients_{
&rpc_worker_clients_};
distributed::TokenSharingRpcServer token_sharing_server_{
this, config_.worker_id, &coordination_, &server_,
&token_sharing_clients_};
};
class Worker : public PrivateBase {
public:
explicit Worker(const Config &config) : PrivateBase(config) {
cluster_discovery_.RegisterWorker(config.worker_id);
}
GraphDb::Type type() const override {
return GraphDb::Type::DISTRIBUTED_WORKER;
}
IMPL_GETTERS
IMPL_DISTRIBUTED_GETTERS
distributed::PlanConsumer &plan_consumer() override { return plan_consumer_; }
distributed::ProduceRpcServer &produce_server() override {
return produce_server_;
}
void ReinitializeStorage() override {
// Release gc scheduler to stop it from touching storage
storage_gc_ = nullptr;
PrivateBase::ReinitializeStorage();
storage_gc_ = std::make_unique<StorageGcWorker>(
*storage_, tx_engine_, config_.gc_cycle_sec,
rpc_worker_clients_.GetClientPool(0), config_.worker_id);
}
communication::rpc::Server server_{
config_.worker_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
distributed::WorkerCoordination coordination_{server_,
config_.master_endpoint};
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
tx::WorkerEngine tx_engine_{rpc_worker_clients_.GetClientPool(0)};
std::unique_ptr<StorageGcWorker> storage_gc_ =
std::make_unique<StorageGcWorker>(
*storage_, tx_engine_, config_.gc_cycle_sec,
rpc_worker_clients_.GetClientPool(0), config_.worker_id);
TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{
rpc_worker_clients_.GetClientPool(0)};
database::WorkerCounters counters_{rpc_worker_clients_.GetClientPool(0)};
distributed::BfsSubcursorStorage subcursor_storage_{this};
distributed::BfsRpcServer bfs_subcursor_server_{this, &server_,
&subcursor_storage_};
distributed::BfsRpcClients bfs_subcursor_clients_{this, &subcursor_storage_,
&rpc_worker_clients_};
distributed::DataRpcServer data_server_{*this, server_};
distributed::DataRpcClients data_clients_{rpc_worker_clients_};
distributed::PlanConsumer plan_consumer_{server_};
distributed::ProduceRpcServer produce_server_{*this, tx_engine_, server_,
plan_consumer_};
distributed::IndexRpcServer index_rpc_server_{*this, server_};
distributed::UpdatesRpcServer updates_server_{*this, server_};
distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
distributed::DataManager data_manager_{*this, data_clients_};
distributed::WorkerTransactionalCacheCleaner cache_cleaner_{
tx_engine_, &wal(), server_,
produce_server_, updates_server_, data_manager_};
distributed::DurabilityRpcServer durability_rpc_server_{*this, server_};
distributed::ClusterDiscoveryWorker cluster_discovery_{
server_, coordination_, rpc_worker_clients_.GetClientPool(0)};
distributed::TokenSharingRpcClients token_sharing_clients_{
&rpc_worker_clients_};
distributed::TokenSharingRpcServer token_sharing_server_{
this, config_.worker_id, &coordination_, &server_,
&token_sharing_clients_};
};
#undef IMPL_GETTERS
PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl)
: impl_(std::move(impl)) {
SingleNode::SingleNode(Config config)
: impl_(std::make_unique<impl::SingleNode>(config)) {
if (impl_->config_.durability_enabled)
utils::CheckDir(impl_->config_.durability_directory);
// Durability recovery.
{
auto db_type = impl_->type();
// What we should recover.
std::experimental::optional<durability::RecoveryInfo>
required_recovery_info;
if (db_type == Type::DISTRIBUTED_WORKER) {
required_recovery_info = dynamic_cast<impl::Worker *>(impl_.get())
->cluster_discovery_.recovery_info();
}
// What we recover.
std::experimental::optional<durability::RecoveryInfo> recovery_info;
// Recover only if necessary.
if ((db_type != Type::DISTRIBUTED_WORKER &&
impl_->config_.db_recover_on_startup) ||
(db_type == Type::DISTRIBUTED_WORKER && required_recovery_info)) {
recovery_info = durability::Recover(impl_->config_.durability_directory,
*impl_, required_recovery_info);
}
// Post-recovery setup and checking.
switch (db_type) {
case Type::DISTRIBUTED_MASTER:
dynamic_cast<impl::Master *>(impl_.get())
->coordination_.SetRecoveryInfo(recovery_info);
if (recovery_info) {
CHECK(impl_->config_.recovering_cluster_size > 0)
<< "Invalid cluster recovery size flag. Recovered cluster size "
"should be at least 1";
while (dynamic_cast<impl::Master *>(impl_.get())
->coordination_.CountRecoveredWorkers() !=
impl_->config_.recovering_cluster_size - 1) {
LOG(INFO) << "Waiting for workers to finish recovering..";
std::this_thread::sleep_for(2s);
}
}
// Start the dynamic graph partitioner inside token sharing server
if (impl_->config_.dynamic_graph_partitioner_enabled) {
dynamic_cast<impl::Master *>(impl_.get())
->token_sharing_server_.StartTokenSharing();
}
break;
case Type::DISTRIBUTED_WORKER:
if (required_recovery_info != recovery_info)
LOG(FATAL) << "Memgraph worker failed to recover the database state "
"recovered on the master";
dynamic_cast<impl::Worker *>(impl_.get())
->cluster_discovery_.NotifyWorkerRecovered();
break;
case Type::SINGLE_NODE:
break;
}
// Recover only if necessary.
if (impl_->config_.db_recover_on_startup) {
durability::Recover(impl_->config_.durability_directory, *this,
std::experimental::nullopt);
}
if (impl_->config_.durability_enabled) {
impl_->wal().Enable();
impl_->wal_.Enable();
snapshot_creator_ = std::make_unique<utils::Scheduler>();
snapshot_creator_->Run(
"Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
[this] {
GraphDbAccessor dba(*this);
this->MakeSnapshot(dba);
});
}
// Start transaction killer.
@ -415,7 +78,7 @@ PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl)
std::chrono::seconds(std::max(
1, std::min(5, impl_->config_.query_execution_time_sec / 4))),
[this]() {
impl_->tx_engine().LocalForEachActiveTransaction(
impl_->tx_engine_.LocalForEachActiveTransaction(
[this](tx::Transaction &t) {
if (t.creation_time() +
std::chrono::seconds(
@ -428,128 +91,64 @@ PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl)
}
}
PublicBase::~PublicBase() {
SingleNode::~SingleNode() {
snapshot_creator_ = nullptr;
is_accepting_transactions_ = false;
tx_engine().LocalForEachActiveTransaction(
impl_->tx_engine_.LocalForEachActiveTransaction(
[](auto &t) { t.set_should_abort(); });
// If we are not a worker we can do a snapshot on exit if it's enabled. Doing
// this on the master forces workers to do the same through rpcs
if (impl_->config_.snapshot_on_exit &&
impl_->type() != Type::DISTRIBUTED_WORKER) {
if (impl_->config_.snapshot_on_exit) {
GraphDbAccessor dba(*this);
MakeSnapshot(dba);
}
}
GraphDb::Type PublicBase::type() const { return impl_->type(); }
Storage &PublicBase::storage() { return impl_->storage(); }
durability::WriteAheadLog &PublicBase::wal() { return impl_->wal(); }
tx::Engine &PublicBase::tx_engine() { return impl_->tx_engine(); }
ConcurrentIdMapper<Label> &PublicBase::label_mapper() {
return impl_->label_mapper();
}
ConcurrentIdMapper<EdgeType> &PublicBase::edge_type_mapper() {
return impl_->edge_type_mapper();
}
ConcurrentIdMapper<Property> &PublicBase::property_mapper() {
return impl_->property_mapper();
}
database::Counters &PublicBase::counters() { return impl_->counters(); }
void PublicBase::CollectGarbage() { impl_->CollectGarbage(); }
int PublicBase::WorkerId() const { return impl_->WorkerId(); }
std::vector<int> PublicBase::GetWorkerIds() const {
return impl_->GetWorkerIds();
}
distributed::BfsRpcServer &PublicBase::bfs_subcursor_server() {
return impl_->bfs_subcursor_server();
}
distributed::BfsRpcClients &PublicBase::bfs_subcursor_clients() {
return impl_->bfs_subcursor_clients();
}
distributed::DataRpcServer &PublicBase::data_server() {
return impl_->data_server();
}
distributed::DataRpcClients &PublicBase::data_clients() {
return impl_->data_clients();
}
distributed::PlanDispatcher &PublicBase::plan_dispatcher() {
return impl_->plan_dispatcher();
}
distributed::IndexRpcClients &PublicBase::index_rpc_clients() {
return impl_->index_rpc_clients();
}
distributed::PlanConsumer &PublicBase::plan_consumer() {
return impl_->plan_consumer();
}
distributed::PullRpcClients &PublicBase::pull_clients() {
return impl_->pull_clients();
}
distributed::ProduceRpcServer &PublicBase::produce_server() {
return impl_->produce_server();
}
distributed::UpdatesRpcServer &PublicBase::updates_server() {
return impl_->updates_server();
}
distributed::UpdatesRpcClients &PublicBase::updates_clients() {
return impl_->updates_clients();
}
distributed::DataManager &PublicBase::data_manager() {
return impl_->data_manager();
Storage &SingleNode::storage() { return *impl_->storage_; }
durability::WriteAheadLog &SingleNode::wal() { return impl_->wal_; }
tx::Engine &SingleNode::tx_engine() { return impl_->tx_engine_; }
storage::ConcurrentIdMapper<storage::Label> &SingleNode::label_mapper() {
return impl_->typemap_pack_.label;
}
bool PublicBase::MakeSnapshot(GraphDbAccessor &accessor) {
return impl_->MakeSnapshot(accessor);
storage::ConcurrentIdMapper<storage::EdgeType> &SingleNode::edge_type_mapper() {
return impl_->typemap_pack_.edge_type;
}
void PublicBase::ReinitializeStorage() { impl_->ReinitializeStorage(); }
storage::ConcurrentIdMapper<storage::Property> &SingleNode::property_mapper() {
return impl_->typemap_pack_.property;
}
} // namespace impl
database::Counters &SingleNode::counters() { return impl_->counters_; }
MasterBase::MasterBase(std::unique_ptr<impl::PrivateBase> impl)
: PublicBase(std::move(impl)) {
if (impl_->config_.durability_enabled) {
impl_->wal().Enable();
snapshot_creator_ = std::make_unique<utils::Scheduler>();
snapshot_creator_->Run(
"Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
[this] {
GraphDbAccessor dba(*this);
impl_->MakeSnapshot(dba);
});
void SingleNode::CollectGarbage() { impl_->storage_gc_->CollectGarbage(); }
int SingleNode::WorkerId() const { return impl_->config_.worker_id; }
std::vector<int> SingleNode::GetWorkerIds() const { return {0}; }
bool SingleNode::MakeSnapshot(GraphDbAccessor &accessor) {
const bool status = durability::MakeSnapshot(
*this, accessor, fs::path(impl_->config_.durability_directory),
impl_->config_.snapshot_max_retained);
if (status) {
LOG(INFO) << "Snapshot created successfully.";
} else {
LOG(ERROR) << "Snapshot creation failed!";
}
return status;
}
MasterBase::~MasterBase() { snapshot_creator_ = nullptr; }
SingleNode::SingleNode(Config config)
: MasterBase(std::make_unique<impl::SingleNode>(config)) {}
Master::Master(Config config)
: MasterBase(std::make_unique<impl::Master>(config)) {}
io::network::Endpoint Master::endpoint() const {
return dynamic_cast<impl::Master *>(impl_.get())->server_.endpoint();
void SingleNode::ReinitializeStorage() {
// Release gc scheduler to stop it from touching storage
impl_->storage_gc_ = nullptr;
impl_->storage_ = std::make_unique<Storage>(
impl_->config_.worker_id, impl_->config_.properties_on_disk);
impl_->storage_gc_ = std::make_unique<StorageGcSingleNode>(
*impl_->storage_, impl_->tx_engine_, impl_->config_.gc_cycle_sec);
}
io::network::Endpoint Master::GetEndpoint(int worker_id) {
return dynamic_cast<impl::Master *>(impl_.get())
->coordination_.GetEndpoint(worker_id);
}
Worker::Worker(Config config)
: PublicBase(std::make_unique<impl::Worker>(config)) {}
io::network::Endpoint Worker::endpoint() const {
return dynamic_cast<impl::Worker *>(impl_.get())->server_.endpoint();
}
io::network::Endpoint Worker::GetEndpoint(int worker_id) {
return dynamic_cast<impl::Worker *>(impl_.get())
->coordination_.GetEndpoint(worker_id);
}
void Worker::WaitForShutdown() {
dynamic_cast<impl::Worker *>(impl_.get())->coordination_.WaitForShutdown();
}
} // namespace database

View File

@ -14,21 +14,6 @@
#include "transactions/engine.hpp"
#include "utils/scheduler.hpp"
namespace distributed {
class BfsRpcServer;
class BfsRpcClients;
class DataRpcServer;
class DataRpcClients;
class PlanDispatcher;
class PlanConsumer;
class PullRpcClients;
class ProduceRpcServer;
class UpdatesRpcServer;
class UpdatesRpcClients;
class DataManager;
class IndexRpcClients;
} // namespace distributed
namespace database {
/// Database configuration. Initialized from flags, but modifiable.
@ -87,6 +72,11 @@ class GraphDb {
enum class Type { SINGLE_NODE, DISTRIBUTED_MASTER, DISTRIBUTED_WORKER };
GraphDb() {}
GraphDb(const GraphDb &) = delete;
GraphDb(GraphDb &&) = delete;
GraphDb &operator=(const GraphDb &) = delete;
GraphDb &operator=(GraphDb &&) = delete;
virtual ~GraphDb() {}
virtual Type type() const = 0;
@ -102,25 +92,6 @@ class GraphDb {
virtual int WorkerId() const = 0;
virtual std::vector<int> GetWorkerIds() const = 0;
// Supported only in distributed master and worker, not in single-node.
virtual distributed::BfsRpcServer &bfs_subcursor_server() = 0;
virtual distributed::BfsRpcClients &bfs_subcursor_clients() = 0;
virtual distributed::DataRpcServer &data_server() = 0;
virtual distributed::DataRpcClients &data_clients() = 0;
virtual distributed::UpdatesRpcServer &updates_server() = 0;
virtual distributed::UpdatesRpcClients &updates_clients() = 0;
virtual distributed::DataManager &data_manager() = 0;
// Supported only in distributed master.
virtual distributed::PullRpcClients &pull_clients() = 0;
virtual distributed::PlanDispatcher &plan_dispatcher() = 0;
virtual distributed::IndexRpcClients &index_rpc_clients() = 0;
// Supported only in distributed worker.
// TODO remove once end2end testing is possible.
virtual distributed::ProduceRpcServer &produce_server() = 0;
virtual distributed::PlanConsumer &plan_consumer() = 0;
// Makes a snapshot from the visibility of the given accessor
virtual bool MakeSnapshot(GraphDbAccessor &accessor) = 0;
@ -130,23 +101,23 @@ class GraphDb {
// recovery
virtual void ReinitializeStorage() = 0;
GraphDb(const GraphDb &) = delete;
GraphDb(GraphDb &&) = delete;
GraphDb &operator=(const GraphDb &) = delete;
GraphDb &operator=(GraphDb &&) = delete;
/** When this is false, no new transactions should be created. */
bool is_accepting_transactions() const { return is_accepting_transactions_; }
protected:
std::atomic<bool> is_accepting_transactions_{true};
};
namespace impl {
// Private GraphDb implementations all inherit `PrivateBase`.
// Public GraphDb implementations all inherit `PublicBase`.
class PrivateBase;
class SingleNode;
} // namespace impl
// Base class for all GraphDb implementations exposes to the client programmer.
// Encapsulates an instance of a private implementation of GraphDb and performs
// initialization and cleanup.
class PublicBase : public GraphDb {
class SingleNode final : public GraphDb {
public:
Type type() const override;
explicit SingleNode(Config config = Config());
~SingleNode();
Type type() const override { return GraphDb::Type::SINGLE_NODE; }
Storage &storage() override;
durability::WriteAheadLog &wal() override;
tx::Engine &tx_engine() override;
@ -157,68 +128,15 @@ class PublicBase : public GraphDb {
void CollectGarbage() override;
int WorkerId() const override;
std::vector<int> GetWorkerIds() const override;
distributed::BfsRpcServer &bfs_subcursor_server() override;
distributed::BfsRpcClients &bfs_subcursor_clients() override;
distributed::DataRpcServer &data_server() override;
distributed::DataRpcClients &data_clients() override;
distributed::PlanDispatcher &plan_dispatcher() override;
distributed::IndexRpcClients &index_rpc_clients() override;
distributed::PlanConsumer &plan_consumer() override;
distributed::PullRpcClients &pull_clients() override;
distributed::ProduceRpcServer &produce_server() override;
distributed::UpdatesRpcServer &updates_server() override;
distributed::UpdatesRpcClients &updates_clients() override;
distributed::DataManager &data_manager() override;
bool is_accepting_transactions() const { return is_accepting_transactions_; }
bool MakeSnapshot(GraphDbAccessor &accessor) override;
void ReinitializeStorage() override;
protected:
explicit PublicBase(std::unique_ptr<PrivateBase> impl);
~PublicBase();
std::unique_ptr<PrivateBase> impl_;
private:
/** When this is false, no new transactions should be created. */
std::atomic<bool> is_accepting_transactions_{true};
std::unique_ptr<impl::SingleNode> impl_;
std::unique_ptr<utils::Scheduler> snapshot_creator_;
utils::Scheduler transaction_killer_;
};
} // namespace impl
class MasterBase : public impl::PublicBase {
public:
explicit MasterBase(std::unique_ptr<impl::PrivateBase> impl);
~MasterBase();
private:
std::unique_ptr<utils::Scheduler> snapshot_creator_;
};
class SingleNode : public MasterBase {
public:
explicit SingleNode(Config config = Config());
};
class Master : public MasterBase {
public:
explicit Master(Config config = Config());
/** Gets this master's endpoint. */
io::network::Endpoint endpoint() const;
/** Gets the endpoint of the worker with the given id. */
// TODO make const once Coordination::GetEndpoint is const.
io::network::Endpoint GetEndpoint(int worker_id);
};
class Worker : public impl::PublicBase {
public:
explicit Worker(Config config = Config());
/** Gets this worker's endpoint. */
io::network::Endpoint endpoint() const;
/** Gets the endpoint of the worker with the given id. */
// TODO make const once Coordination::GetEndpoint is const.
io::network::Endpoint GetEndpoint(int worker_id);
void WaitForShutdown();
};
} // namespace database

View File

@ -1,8 +1,10 @@
#include "database/graph_db_accessor.hpp"
#include <functional>
#include "glog/logging.h"
#include "database/graph_db_accessor.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/state_delta.hpp"
#include "distributed/data_manager.hpp"
#include "distributed/rpc_worker_clients.hpp"
@ -88,15 +90,23 @@ VertexAccessor GraphDbAccessor::InsertVertexIntoRemote(
CHECK(worker_id != db().WorkerId())
<< "Not allowed to call InsertVertexIntoRemote for local worker";
gid::Gid gid = db().updates_clients().CreateVertex(
worker_id, transaction_id(), labels, properties);
distributed::UpdatesRpcClients *updates_clients = nullptr;
distributed::DataManager *data_manager = nullptr;
// TODO: Replace this with virtual call or some other mechanism.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&db())) {
updates_clients = &distributed_db->updates_clients();
data_manager = &distributed_db->data_manager();
}
CHECK(updates_clients && data_manager);
gid::Gid gid = updates_clients->CreateVertex(worker_id, transaction_id(),
labels, properties);
auto vertex = std::make_unique<Vertex>();
vertex->labels_ = labels;
for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second);
db().data_manager()
.Elements<Vertex>(transaction_id())
data_manager->Elements<Vertex>(transaction_id())
.emplace(gid, nullptr, std::move(vertex));
return VertexAccessor({gid, worker_id}, *this);
}
@ -159,8 +169,11 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
// Notify all workers to create the index
if (db_.type() == GraphDb::Type::DISTRIBUTED_MASTER) {
index_rpc_completions.emplace(db_.index_rpc_clients().GetCreateIndexFutures(
label, property, this->db_.WorkerId()));
// TODO: Replace this with virtual call or some other mechanism.
database::Master *master_db = dynamic_cast<database::Master *>(&db_);
index_rpc_completions.emplace(
master_db->index_rpc_clients().GetCreateIndexFutures(
label, property, this->db_.WorkerId()));
}
if (index_rpc_completions) {
@ -204,8 +217,10 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
// Notify all workers to start populating an index if we are the master since
// they don't have to wait anymore
if (db_.type() == GraphDb::Type::DISTRIBUTED_MASTER) {
// TODO: Replace this with virtual call or some other mechanism.
database::Master *master_db = dynamic_cast<database::Master *>(&db_);
index_rpc_completions.emplace(
db_.index_rpc_clients().GetPopulateIndexFutures(
master_db->index_rpc_clients().GetPopulateIndexFutures(
label, property, dba.transaction_id(), this->db_.WorkerId()));
}
@ -364,8 +379,15 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor,
if (!vertex_accessor.is_local()) {
auto address = vertex_accessor.address();
db().updates_clients().RemoveVertex(address.worker_id(), transaction_id(),
address.gid(), check_empty);
distributed::UpdatesRpcClients *updates_clients = nullptr;
// TODO: Replace this with virtual call or some other mechanism.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&db())) {
updates_clients = &distributed_db->updates_clients();
}
CHECK(updates_clients);
updates_clients->RemoveVertex(address.worker_id(), transaction_id(),
address.gid(), check_empty);
// We can't know if we are going to be able to remove vertex until deferred
// updates on a remote worker are executed
return true;
@ -429,16 +451,23 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
from.gid(), to.gid(), edge_type, EdgeTypeName(edge_type)));
} else {
edge_address = db().updates_clients().CreateEdge(transaction_id(), from, to,
edge_type);
distributed::UpdatesRpcClients *updates_clients = nullptr;
distributed::DataManager *data_manager = nullptr;
// TODO: Replace this with virtual call or some other mechanism.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&db())) {
updates_clients = &distributed_db->updates_clients();
data_manager = &distributed_db->data_manager();
}
CHECK(updates_clients && data_manager);
edge_address =
updates_clients->CreateEdge(transaction_id(), from, to, edge_type);
from_updated = db().data_manager()
.Elements<Vertex>(transaction_id())
.FindNew(from.gid());
from_updated =
data_manager->Elements<Vertex>(transaction_id()).FindNew(from.gid());
// Create an Edge and insert it into the Cache so we see it locally.
db().data_manager()
.Elements<Edge>(transaction_id())
data_manager->Elements<Edge>(transaction_id())
.emplace(
edge_address.gid(), nullptr,
std::make_unique<Edge>(from.address(), to.address(), edge_type));
@ -455,16 +484,24 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
to.SwitchNew();
to_updated = &to.update();
} else {
distributed::UpdatesRpcClients *updates_clients = nullptr;
distributed::DataManager *data_manager = nullptr;
// TODO: Replace this with virtual call or some other mechanism.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&db())) {
updates_clients = &distributed_db->updates_clients();
data_manager = &distributed_db->data_manager();
}
CHECK(updates_clients && data_manager);
// The RPC call for the `to` side is already handled if `from` is not local.
if (from.is_local() ||
from.address().worker_id() != to.address().worker_id()) {
db().updates_clients().AddInEdge(
transaction_id(), from,
db().storage().GlobalizedAddress(edge_address), to, edge_type);
updates_clients->AddInEdge(transaction_id(), from,
db().storage().GlobalizedAddress(edge_address),
to, edge_type);
}
to_updated = db().data_manager()
.Elements<Vertex>(transaction_id())
.FindNew(to.gid());
to_updated =
data_manager->Elements<Vertex>(transaction_id()).FindNew(to.gid());
}
to_updated->in_.emplace(
db_.storage().LocalizedAddressIfPossible(from.address()), edge_address,
@ -522,15 +559,21 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge,
CHECK(edge_addr.worker_id() == from_addr.worker_id())
<< "Edge and it's 'from' vertex not on the same worker";
auto to_addr = db().storage().GlobalizedAddress(edge.to_addr());
db().updates_clients().RemoveEdge(transaction_id(), edge_addr.worker_id(),
edge_addr.gid(), from_addr.gid(),
to_addr);
distributed::UpdatesRpcClients *updates_clients = nullptr;
// TODO: Replace this with virtual call or some other mechanism.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&db())) {
updates_clients = &distributed_db->updates_clients();
}
CHECK(updates_clients);
updates_clients->RemoveEdge(transaction_id(), edge_addr.worker_id(),
edge_addr.gid(), from_addr.gid(), to_addr);
// Another RPC is necessary only if the first did not handle vertices on
// both sides.
if (edge_addr.worker_id() != to_addr.worker_id()) {
db().updates_clients().RemoveInEdge(transaction_id(), to_addr.worker_id(),
to_addr.gid(), edge_addr);
updates_clients->RemoveInEdge(transaction_id(), to_addr.worker_id(),
to_addr.gid(), edge_addr);
}
}
}

View File

@ -1,14 +1,19 @@
#include "bfs_rpc_clients.hpp"
#include "database/distributed_graph_db.hpp"
#include "distributed/bfs_rpc_messages.hpp"
#include "distributed/data_manager.hpp"
#include "bfs_rpc_clients.hpp"
namespace distributed {
BfsRpcClients::BfsRpcClients(
database::GraphDb *db, distributed::BfsSubcursorStorage *subcursor_storage,
distributed::RpcWorkerClients *clients)
: db_(db), subcursor_storage_(subcursor_storage), clients_(clients) {}
BfsRpcClients::BfsRpcClients(database::GraphDb *db,
BfsSubcursorStorage *subcursor_storage,
RpcWorkerClients *clients,
DataManager *data_manager)
: db_(db),
subcursor_storage_(subcursor_storage),
clients_(clients),
data_manager_(data_manager) {}
std::unordered_map<int16_t, int64_t> BfsRpcClients::CreateBfsSubcursors(
tx::TransactionId tx_id, query::EdgeAtom::Direction direction,
@ -77,14 +82,12 @@ std::experimental::optional<VertexAccessor> BfsRpcClients::Pull(
CHECK(res) << "SubcursorPull RPC failed!";
if (!res->vertex) return std::experimental::nullopt;
db_->data_manager()
.Elements<Vertex>(dba->transaction_id())
data_manager_->Elements<Vertex>(dba->transaction_id())
.emplace(res->vertex->global_address.gid(),
std::move(res->vertex->old_element_output),
std::move(res->vertex->new_element_output));
return VertexAccessor(res->vertex->global_address, *dba);
}
bool BfsRpcClients::ExpandLevel(
const std::unordered_map<int16_t, int64_t> &subcursor_ids) {
auto futures = clients_->ExecuteOnWorkers<bool>(
@ -133,12 +136,11 @@ bool BfsRpcClients::ExpandToRemoteVertex(
}
PathSegment BuildPathSegment(ReconstructPathRes *res,
database::GraphDbAccessor *dba) {
database::GraphDbAccessor *dba,
distributed::DataManager *data_manager) {
std::vector<EdgeAccessor> edges;
for (auto &edge : res->edges) {
dba->db()
.data_manager()
.Elements<Edge>(dba->transaction_id())
data_manager->Elements<Edge>(dba->transaction_id())
.emplace(edge.global_address.gid(), std::move(edge.old_element_output),
std::move(edge.new_element_output));
edges.emplace_back(edge.global_address, *dba);
@ -158,7 +160,7 @@ PathSegment BfsRpcClients::ReconstructPath(
auto res = clients_->GetClientPool(worker_id).Call<ReconstructPathRpc>(
subcursor_ids.at(worker_id), vertex);
return BuildPathSegment(&res.value(), dba);
return BuildPathSegment(&res.value(), dba, data_manager_);
}
PathSegment BfsRpcClients::ReconstructPath(
@ -171,7 +173,7 @@ PathSegment BfsRpcClients::ReconstructPath(
}
auto res = clients_->GetClientPool(worker_id).Call<ReconstructPathRpc>(
subcursor_ids.at(worker_id), edge);
return BuildPathSegment(&res.value(), dba);
return BuildPathSegment(&res.value(), dba, data_manager_);
}
void BfsRpcClients::PrepareForExpand(

View File

@ -7,6 +7,8 @@
namespace distributed {
class DataManager;
/// Along with `BfsRpcServer`, this class is used to expose `BfsSubcursor`
/// interface over the network so that subcursors can communicate during the
/// traversal. It is just a thin wrapper making RPC calls that also takes
@ -16,8 +18,9 @@ namespace distributed {
class BfsRpcClients {
public:
BfsRpcClients(database::GraphDb *db,
distributed::BfsSubcursorStorage *subcursor_storage,
distributed::RpcWorkerClients *clients);
BfsSubcursorStorage *subcursor_storage,
RpcWorkerClients *clients,
DataManager *data_manager);
std::unordered_map<int16_t, int64_t> CreateBfsSubcursors(
tx::TransactionId tx_id, query::EdgeAtom::Direction direction,
@ -57,9 +60,10 @@ class BfsRpcClients {
const std::unordered_map<int16_t, int64_t> &subcursor_ids, bool clear);
private:
database::GraphDb *db_;
distributed::BfsSubcursorStorage *subcursor_storage_;
distributed::RpcWorkerClients *clients_;
database::GraphDb *db_{nullptr};
distributed::BfsSubcursorStorage *subcursor_storage_{nullptr};
distributed::RpcWorkerClients *clients_{nullptr};
distributed::DataManager *data_manager_{nullptr};
};
} // namespace distributed

View File

@ -1,12 +1,13 @@
#include "bfs_subcursor.hpp"
#include <unordered_map>
#include "database/distributed_graph_db.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "query/plan/operator.hpp"
#include "storage/address_types.hpp"
#include "storage/vertex_accessor.hpp"
#include "bfs_subcursor.hpp"
namespace distributed {
using query::TypedValue;
@ -14,9 +15,10 @@ using query::TypedValue;
ExpandBfsSubcursor::ExpandBfsSubcursor(
database::GraphDb *db, tx::TransactionId tx_id,
query::EdgeAtom::Direction direction,
std::vector<storage::EdgeType> edge_types, query::GraphView graph_view)
: dba_(*db, tx_id),
std::vector<storage::EdgeType> edge_types, query::GraphView graph_view,
BfsRpcClients *bfs_subcursor_clients)
: bfs_subcursor_clients_(bfs_subcursor_clients),
dba_(*db, tx_id),
direction_(direction),
edge_types_(std::move(edge_types)),
graph_view_(graph_view) {
@ -145,10 +147,9 @@ void ExpandBfsSubcursor::ReconstructPathHelper(VertexAccessor vertex,
bool ExpandBfsSubcursor::ExpandToVertex(EdgeAccessor edge,
VertexAccessor vertex) {
// TODO(mtomic): lambda filtering in distributed
return vertex.is_local()
? ExpandToLocalVertex(edge.address(), vertex)
: dba_.db().bfs_subcursor_clients().ExpandToRemoteVertex(
subcursor_ids_, edge, vertex);
return vertex.is_local() ? ExpandToLocalVertex(edge.address(), vertex)
: bfs_subcursor_clients_->ExpandToRemoteVertex(
subcursor_ids_, edge, vertex);
}
bool ExpandBfsSubcursor::ExpandFromVertex(VertexAccessor vertex) {
@ -164,7 +165,9 @@ bool ExpandBfsSubcursor::ExpandFromVertex(VertexAccessor vertex) {
return expanded;
}
BfsSubcursorStorage::BfsSubcursorStorage(database::GraphDb *db) : db_(db) {}
BfsSubcursorStorage::BfsSubcursorStorage(database::GraphDb *db,
BfsRpcClients *bfs_subcursor_clients)
: db_(db), bfs_subcursor_clients_(bfs_subcursor_clients) {}
int64_t BfsSubcursorStorage::Create(tx::TransactionId tx_id,
query::EdgeAtom::Direction direction,
@ -172,9 +175,10 @@ int64_t BfsSubcursorStorage::Create(tx::TransactionId tx_id,
query::GraphView graph_view) {
std::lock_guard<std::mutex> lock(mutex_);
int64_t id = next_subcursor_id_++;
auto got = storage_.emplace(
id, std::make_unique<ExpandBfsSubcursor>(
db_, tx_id, direction, std::move(edge_types), graph_view));
auto got =
storage_.emplace(id, std::make_unique<ExpandBfsSubcursor>(
db_, tx_id, direction, std::move(edge_types),
graph_view, bfs_subcursor_clients_));
CHECK(got.second) << "Subcursor with ID " << id << " already exists";
return id;
}

View File

@ -16,6 +16,8 @@ class GraphDb;
namespace distributed {
class BfsRpcClients;
/// Path from BFS source to a vertex might span multiple workers. This struct
/// stores information describing segment of a path stored on a worker and
/// information necessary to continue path reconstruction on another worker.
@ -33,7 +35,8 @@ class ExpandBfsSubcursor {
ExpandBfsSubcursor(database::GraphDb *db, tx::TransactionId tx_id,
query::EdgeAtom::Direction direction,
std::vector<storage::EdgeType> edge_types,
query::GraphView graph_view);
query::GraphView graph_view,
BfsRpcClients *bfs_subcursor_clients);
// Stores subcursor ids of other workers.
void RegisterSubcursors(std::unordered_map<int16_t, int64_t> subcursor_ids) {
@ -73,7 +76,7 @@ class ExpandBfsSubcursor {
/// Reconstruct the part of path to given vertex stored on this worker.
PathSegment ReconstructPath(storage::VertexAddress vertex_addr);
/// Used to reset subcursor state before starting expansion from new source.
void Reset();
@ -89,6 +92,8 @@ class ExpandBfsSubcursor {
/// Helper for path reconstruction doing the actual work.
void ReconstructPathHelper(VertexAccessor vertex, PathSegment *result);
BfsRpcClients *bfs_subcursor_clients_{nullptr};
database::GraphDbAccessor dba_;
/// IDs of subcursors on other workers, used when sending RPCs.
@ -123,7 +128,8 @@ class ExpandBfsSubcursor {
/// Thread-safe storage for BFS subcursors.
class BfsSubcursorStorage {
public:
explicit BfsSubcursorStorage(database::GraphDb *db);
explicit BfsSubcursorStorage(database::GraphDb *db,
BfsRpcClients *bfs_subcursor_clients);
int64_t Create(tx::TransactionId tx_id, query::EdgeAtom::Direction direction,
std::vector<storage::EdgeType> edge_types,
@ -132,7 +138,8 @@ class BfsSubcursorStorage {
ExpandBfsSubcursor *Get(int64_t subcursor_id);
private:
database::GraphDb *db_;
database::GraphDb *db_{nullptr};
BfsRpcClients *bfs_subcursor_clients_{nullptr};
std::mutex mutex_;
std::map<int64_t, std::unique_ptr<ExpandBfsSubcursor>> storage_;

View File

@ -98,7 +98,8 @@ ProduceRpcServer::OngoingProduce::PullOneFromCursor() {
ProduceRpcServer::ProduceRpcServer(
database::GraphDb &db, tx::Engine &tx_engine,
communication::rpc::Server &server,
const distributed::PlanConsumer &plan_consumer)
const PlanConsumer &plan_consumer,
DataManager *data_manager)
: db_(db),
produce_rpc_server_(server),
plan_consumer_(plan_consumer),
@ -120,12 +121,14 @@ ProduceRpcServer::ProduceRpcServer(
res.Save(res_builder);
});
CHECK(data_manager);
produce_rpc_server_.Register<TransactionCommandAdvancedRpc>(
[this](const auto &req_reader, auto *res_builder) {
[this, data_manager](const auto &req_reader, auto *res_builder) {
TransactionCommandAdvancedReq req;
req.Load(req_reader);
tx_engine_.UpdateCommand(req.member);
db_.data_manager().ClearCacheForSingleTransaction(req.member);
data_manager->ClearCacheForSingleTransaction(req.member);
TransactionCommandAdvancedRes res;
res.Save(res_builder);
});

View File

@ -21,6 +21,8 @@
namespace distributed {
class DataManager;
/// Handles the execution of a plan on the worker, requested by the remote
/// master. Assumes that (tx id, command id, plan id) uniquely identifies an
/// execution, and that there will never be parallel requests for the same
@ -64,7 +66,8 @@ class ProduceRpcServer {
public:
ProduceRpcServer(database::GraphDb &db, tx::Engine &tx_engine,
communication::rpc::Server &server,
const distributed::PlanConsumer &plan_consumer);
const PlanConsumer &plan_consumer,
DataManager *data_manager);
/// Finish and clear ongoing produces for all plans that are tied to a
/// transaction with tx_id.

View File

@ -17,6 +17,7 @@ cpp<#
(lcp:in-impl
#>cpp
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/data_manager.hpp"
cpp<#)
@ -369,7 +370,14 @@ void PullResData::LoadGraphElement(
database::GraphDbAccessor *dba,
const distributed::capnp::TypedValue::Reader &reader,
query::TypedValue *value) {
auto load_vertex = [dba](const auto &vertex_reader) {
distributed::DataManager *data_manager = nullptr;
// TODO: Pass in a DistributedGraphDb or data_manager.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba->db())) {
data_manager = &distributed_db->data_manager();
}
CHECK(data_manager);
auto load_vertex = [dba, data_manager](const auto &vertex_reader) {
storage::VertexAddress global_address(vertex_reader.getAddress());
auto old_record =
vertex_reader.hasOld()
@ -381,14 +389,12 @@ void PullResData::LoadGraphElement(
? distributed::LoadVertex<const distributed::capnp::Vertex::Reader>(
vertex_reader.getNew())
: nullptr;
dba->db()
.data_manager()
.Elements<Vertex>(dba->transaction_id())
data_manager->Elements<Vertex>(dba->transaction_id())
.emplace(global_address.gid(), std::move(old_record),
std::move(new_record));
return VertexAccessor(global_address, *dba);
};
auto load_edge = [dba](const auto &edge_reader) {
auto load_edge = [dba, data_manager](const auto &edge_reader) {
storage::EdgeAddress global_address(edge_reader.getAddress());
auto old_record =
edge_reader.hasOld()
@ -400,9 +406,7 @@ void PullResData::LoadGraphElement(
? distributed::LoadEdge<const distributed::capnp::Edge::Reader>(
edge_reader.getNew())
: nullptr;
dba->db()
.data_manager()
.Elements<Edge>(dba->transaction_id())
data_manager->Elements<Edge>(dba->transaction_id())
.emplace(global_address.gid(), std::move(old_record),
std::move(new_record));
return EdgeAccessor(global_address, *dba);
@ -430,7 +434,7 @@ void PullResData::LoadGraphElement(
}
}
cpp<#)
cpp<#)
(lcp:define-rpc pull
(:request

View File

@ -8,7 +8,7 @@ class Server;
}
namespace database {
class GraphDb;
class DistributedGraphDb;
};
namespace distributed {
@ -19,7 +19,7 @@ namespace distributed {
/// step in the same time.
class TokenSharingRpcServer {
public:
TokenSharingRpcServer(database::GraphDb *db, int worker_id,
TokenSharingRpcServer(database::DistributedGraphDb *db, int worker_id,
distributed::Coordination *coordination,
communication::rpc::Server *server,
distributed::TokenSharingRpcClients *clients)

View File

@ -13,6 +13,7 @@
#include "communication/bolt/v1/session.hpp"
#include "config.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db.hpp"
#include "distributed/pull_rpc_clients.hpp"
#include "glue/conversion.hpp"
@ -64,7 +65,7 @@ DECLARE_string(durability_directory);
/** Encapsulates Dbms and Interpreter that are passed through the network server
* and worker to the session. */
struct SessionData {
database::MasterBase &db;
database::GraphDb &db;
query::Interpreter interpreter{db};
};

View File

@ -3,6 +3,7 @@
#include <glog/logging.h>
#include <limits>
#include "database/distributed_graph_db.hpp"
#include "distributed/plan_dispatcher.hpp"
#include "query/exceptions.hpp"
#include "query/frontend/ast/cypher_main_visitor.hpp"
@ -46,9 +47,11 @@ Interpreter::CachedPlan::~CachedPlan() {
}
Interpreter::Interpreter(database::GraphDb &db)
: plan_dispatcher_(db.type() == database::GraphDb::Type::DISTRIBUTED_MASTER
? &db.plan_dispatcher()
: nullptr) {}
: plan_dispatcher_(
db.type() == database::GraphDb::Type::DISTRIBUTED_MASTER
// TODO: Replace this with virtual call or some other mechanism.
? &dynamic_cast<database::Master *>(&db)->plan_dispatcher()
: nullptr) {}
Interpreter::Results Interpreter::operator()(
const std::string &query, database::GraphDbAccessor &db_accessor,

View File

@ -17,6 +17,7 @@
#include "glog/logging.h"
#include "communication/result_stream_faker.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "distributed/pull_rpc_clients.hpp"
@ -1190,17 +1191,23 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
DistributedExpandBfsCursor(const ExpandVariable &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {
subcursor_ids_ = db_.db().bfs_subcursor_clients().CreateBfsSubcursors(
// TODO: Pass in a DistributedGraphDb.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&db.db())) {
bfs_subcursor_clients_ = &distributed_db->bfs_subcursor_clients();
}
CHECK(bfs_subcursor_clients_);
subcursor_ids_ = bfs_subcursor_clients_->CreateBfsSubcursors(
db_.transaction_id(), self_.direction(), self_.edge_types(),
self_.graph_view());
db_.db().bfs_subcursor_clients().RegisterSubcursors(subcursor_ids_);
bfs_subcursor_clients_->RegisterSubcursors(subcursor_ids_);
VLOG(10) << "BFS subcursors initialized";
pull_pos_ = subcursor_ids_.end();
}
~DistributedExpandBfsCursor() {
VLOG(10) << "Removing BFS subcursors";
db_.db().bfs_subcursor_clients().RemoveBfsSubcursors(subcursor_ids_);
bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_);
}
bool Pull(Frame &frame, Context &context) override {
@ -1218,8 +1225,8 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
if (!skip_rest_) {
if (current_depth_ >= lower_bound_) {
for (; pull_pos_ != subcursor_ids_.end(); ++pull_pos_) {
auto vertex = db_.db().bfs_subcursor_clients().Pull(
pull_pos_->first, pull_pos_->second, &db_);
auto vertex = bfs_subcursor_clients_->Pull(pull_pos_->first,
pull_pos_->second, &db_);
if (vertex) {
last_vertex = *vertex;
SwitchAccessor(last_vertex.ValueVertex(), self_.graph_view_);
@ -1263,9 +1270,9 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
"`current_vertex_addr` "
"should be set during path reconstruction";
auto ret = current_edge_addr
? db_.db().bfs_subcursor_clients().ReconstructPath(
? bfs_subcursor_clients_->ReconstructPath(
subcursor_ids_, *current_edge_addr, &db_)
: db_.db().bfs_subcursor_clients().ReconstructPath(
: bfs_subcursor_clients_->ReconstructPath(
subcursor_ids_, *current_vertex_addr, &db_);
edges.insert(edges.end(), ret.edges.begin(), ret.edges.end());
current_vertex_addr = ret.next_vertex;
@ -1286,9 +1293,8 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
if (current_depth_ < upper_bound_) {
VLOG(10) << "Trying to expand again...";
current_depth_++;
db_.db().bfs_subcursor_clients().PrepareForExpand(subcursor_ids_,
false);
if (db_.db().bfs_subcursor_clients().ExpandLevel(subcursor_ids_)) {
bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, false);
if (bfs_subcursor_clients_->ExpandLevel(subcursor_ids_)) {
continue;
}
}
@ -1321,21 +1327,21 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
VLOG(10) << "Starting BFS from " << vertex << " with limits "
<< lower_bound_ << ".." << upper_bound_;
db_.db().bfs_subcursor_clients().PrepareForExpand(subcursor_ids_, true);
db_.db().bfs_subcursor_clients().SetSource(subcursor_ids_,
vertex.GlobalAddress());
bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, true);
bfs_subcursor_clients_->SetSource(subcursor_ids_, vertex.GlobalAddress());
current_depth_ = 1;
}
}
void Reset() override {
db_.db().bfs_subcursor_clients().ResetSubcursors(subcursor_ids_);
bfs_subcursor_clients_->ResetSubcursors(subcursor_ids_);
pull_pos_ = subcursor_ids_.end();
}
private:
const ExpandVariable &self_;
database::GraphDbAccessor &db_;
distributed::BfsRpcClients *bfs_subcursor_clients_{nullptr};
const std::unique_ptr<query::plan::Cursor> input_cursor_;
// Depth bounds. Calculated on each pull from the input, the initial value
@ -3194,11 +3200,17 @@ namespace {
*/
class RemotePuller {
public:
RemotePuller(database::GraphDbAccessor &db,
RemotePuller(distributed::PullRpcClients *pull_clients,
database::GraphDbAccessor &db,
const std::vector<Symbol> &symbols, int64_t plan_id,
tx::CommandId command_id)
: db_(db), symbols_(symbols), plan_id_(plan_id), command_id_(command_id) {
worker_ids_ = db_.db().pull_clients().GetWorkerIds();
: pull_clients_(pull_clients),
db_(db),
symbols_(symbols),
plan_id_(plan_id),
command_id_(command_id) {
CHECK(pull_clients_);
worker_ids_ = pull_clients_->GetWorkerIds();
// Remove master from the worker ids list.
worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0));
}
@ -3280,7 +3292,7 @@ class RemotePuller {
}
void Reset() {
worker_ids_ = db_.db().pull_clients().GetWorkerIds();
worker_ids_ = pull_clients_->GetWorkerIds();
// Remove master from the worker ids list.
worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0));
@ -3289,8 +3301,7 @@ class RemotePuller {
// during its pull.
remote_pulls_.clear();
for (auto &worker_id : worker_ids_) {
db_.db().pull_clients().ResetCursor(&db_, worker_id, plan_id_,
command_id_);
pull_clients_->ResetCursor(&db_, worker_id, plan_id_, command_id_);
}
remote_results_.clear();
remote_pulls_initialized_ = false;
@ -3330,6 +3341,7 @@ class RemotePuller {
}
private:
distributed::PullRpcClients *pull_clients_{nullptr};
database::GraphDbAccessor &db_;
std::vector<Symbol> symbols_;
int64_t plan_id_;
@ -3341,7 +3353,7 @@ class RemotePuller {
bool remote_pulls_initialized_ = false;
void UpdatePullForWorker(int worker_id, Context &context) {
remote_pulls_[worker_id] = db_.db().pull_clients().Pull(
remote_pulls_[worker_id] = pull_clients_->Pull(
&db_, worker_id, plan_id_, command_id_, context.parameters_, symbols_,
context.timestamp_, false);
}
@ -3354,7 +3366,9 @@ class PullRemoteCursor : public Cursor {
input_cursor_(self.input() ? self.input()->MakeCursor(db) : nullptr),
command_id_(db.transaction().cid()),
remote_puller_(
RemotePuller(db, self.symbols(), self.plan_id(), command_id_)) {}
// TODO: Pass in a Master GraphDb.
&dynamic_cast<database::Master *>(&db.db())->pull_clients(), db,
self.symbols(), self.plan_id(), command_id_) {}
bool Pull(Frame &frame, Context &context) override {
if (context.db_accessor_.should_abort()) throw HintedAbortError();
@ -3449,6 +3463,15 @@ class SynchronizeCursor : public Cursor {
public:
SynchronizeCursor(const Synchronize &self, database::GraphDbAccessor &db)
: self_(self),
pull_clients_(
// TODO: Pass in a Master GraphDb.
&dynamic_cast<database::Master *>(&db.db())->pull_clients()),
updates_clients_(
// TODO: Pass in a Master GraphDb.
&dynamic_cast<database::Master *>(&db.db())->updates_clients()),
updates_server_(
// TODO: Pass in a Master GraphDb.
&dynamic_cast<database::Master *>(&db.db())->updates_server()),
input_cursor_(self.input()->MakeCursor(db)),
pull_remote_cursor_(
self.pull_remote() ? self.pull_remote()->MakeCursor(db) : nullptr),
@ -3495,6 +3518,9 @@ class SynchronizeCursor : public Cursor {
private:
const Synchronize &self_;
distributed::PullRpcClients *pull_clients_{nullptr};
distributed::UpdatesRpcClients *updates_clients_{nullptr};
distributed::UpdatesRpcServer *updates_server_{nullptr};
const std::unique_ptr<Cursor> input_cursor_;
const std::unique_ptr<Cursor> pull_remote_cursor_;
bool initial_pull_done_{false};
@ -3509,9 +3535,9 @@ class SynchronizeCursor : public Cursor {
// Tell all workers to accumulate, only if there is a remote pull.
std::vector<utils::Future<distributed::PullData>> worker_accumulations;
if (pull_remote_cursor_) {
for (auto worker_id : db.pull_clients().GetWorkerIds()) {
for (auto worker_id : pull_clients_->GetWorkerIds()) {
if (worker_id == db.WorkerId()) continue;
worker_accumulations.emplace_back(db.pull_clients().Pull(
worker_accumulations.emplace_back(pull_clients_->Pull(
&context.db_accessor_, worker_id, self_.pull_remote()->plan_id(),
command_id_, context.parameters_, self_.pull_remote()->symbols(),
context.timestamp_, true, 0));
@ -3569,9 +3595,8 @@ class SynchronizeCursor : public Cursor {
// Make all the workers apply their deltas.
auto tx_id = context.db_accessor_.transaction_id();
auto apply_futures =
db.updates_clients().UpdateApplyAll(db.WorkerId(), tx_id);
db.updates_server().Apply(tx_id);
auto apply_futures = updates_clients_->UpdateApplyAll(db.WorkerId(), tx_id);
updates_server_->Apply(tx_id);
for (auto &future : apply_futures) {
switch (future.get()) {
case distributed::UpdateResult::SERIALIZATION_ERROR:
@ -3592,8 +3617,7 @@ class SynchronizeCursor : public Cursor {
// If the command advanced, let the workers know.
if (self_.advance_command()) {
auto futures =
db.pull_clients().NotifyAllTransactionCommandAdvanced(tx_id);
auto futures = pull_clients_->NotifyAllTransactionCommandAdvanced(tx_id);
for (auto &future : futures) future.wait();
}
}
@ -3687,7 +3711,9 @@ class PullRemoteOrderByCursor : public Cursor {
input_(self.input()->MakeCursor(db)),
command_id_(db.transaction().cid()),
remote_puller_(
RemotePuller(db, self.symbols(), self.plan_id(), command_id_)) {}
// TODO: Pass in a Master GraphDb.
&dynamic_cast<database::Master *>(&db.db())->pull_clients(), db,
self.symbols(), self.plan_id(), command_id_) {}
bool Pull(Frame &frame, Context &context) {
if (context.db_accessor_.should_abort()) throw HintedAbortError();

View File

@ -1,5 +1,6 @@
#pragma once
#include "database/distributed_graph_db.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "query/exceptions.hpp"
@ -11,7 +12,7 @@ namespace query {
class TransactionEngine final {
public:
TransactionEngine(database::MasterBase &db, Interpreter &interpreter)
TransactionEngine(database::GraphDb &db, Interpreter &interpreter)
: db_(db), interpreter_(interpreter) {}
~TransactionEngine() { Abort(); }
@ -100,7 +101,7 @@ class TransactionEngine final {
}
private:
database::MasterBase &db_;
database::GraphDb &db_;
Interpreter &interpreter_;
std::unique_ptr<database::GraphDbAccessor> db_accessor_;
// The `query::Interpreter::Results` object MUST be destroyed before the
@ -124,9 +125,10 @@ class TransactionEngine final {
db_accessor_->AdvanceCommand();
// TODO: this logic shouldn't be here!
if (db_.type() == database::GraphDb::Type::DISTRIBUTED_MASTER) {
auto *master_db = dynamic_cast<database::Master *>(&db_);
auto tx_id = db_accessor_->transaction_id();
auto futures =
db_.pull_clients().NotifyAllTransactionCommandAdvanced(tx_id);
master_db->pull_clients().NotifyAllTransactionCommandAdvanced(tx_id);
for (auto &future : futures) future.wait();
}
}
@ -140,4 +142,5 @@ class TransactionEngine final {
}
}
};
} // namespace query

View File

@ -4,6 +4,7 @@
#include <unordered_map>
#include <vector>
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "query/exceptions.hpp"
@ -23,7 +24,8 @@ DEFINE_VALIDATED_int32(dgp_max_batch_size, 2000,
"one dynamic graph partitioner step.",
FLAG_IN_RANGE(1, std::numeric_limits<int32_t>::max()));
DynamicGraphPartitioner::DynamicGraphPartitioner(database::GraphDb *db)
DynamicGraphPartitioner::DynamicGraphPartitioner(
database::DistributedGraphDb *db)
: db_(db) {}
void DynamicGraphPartitioner::Run() {

View File

@ -8,7 +8,7 @@
#include "storage/vertex_accessor.hpp"
namespace database {
class GraphDb;
class DistributedGraphDb;
class GraphDbAccessor;
}; // namespace database
@ -25,7 +25,7 @@ class DynamicGraphPartitioner {
delete;
DynamicGraphPartitioner &operator=(DynamicGraphPartitioner &&other) = delete;
explicit DynamicGraphPartitioner(database::GraphDb *db);
explicit DynamicGraphPartitioner(database::DistributedGraphDb *db);
/// Runs one dynamic graph partitioning cycle (step).
void Run();
@ -50,5 +50,5 @@ class DynamicGraphPartitioner {
const VertexAccessor &vertex) const;
private:
database::GraphDb *db_;
database::DistributedGraphDb *db_{nullptr};
};

View File

@ -1,5 +1,6 @@
#include "glog/logging.h"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp"
#include "distributed/data_manager.hpp"
@ -150,8 +151,15 @@ bool RecordAccessor<TRecord>::Reconstruct() const {
// TODO in write queries it's possible the command has been advanced and
// we need to invalidate the Cache and really get the latest stuff.
// But only do that after the command has been advanced.
auto &cache = dba.db().data_manager().template Elements<TRecord>(
dba.transaction_id());
distributed::DataManager *data_manager = nullptr;
// TODO: Replace this with virtual call or some other mechanism.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba.db())) {
data_manager = &distributed_db->data_manager();
}
CHECK(data_manager);
auto &cache =
data_manager->template Elements<TRecord>(dba.transaction_id());
cache.FindSetOldNew(dba.transaction().id_, address_.worker_id(),
address_.gid(), old_, new_);
}
@ -179,8 +187,15 @@ TRecord &RecordAccessor<TRecord>::update() const {
if (is_local()) {
new_ = address_.local()->update(t);
} else {
auto &cache = dba.db().data_manager().template Elements<TRecord>(
dba.transaction_id());
distributed::DataManager *data_manager = nullptr;
// TODO: Replace this with virtual call or some other mechanism.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba.db())) {
data_manager = &distributed_db->data_manager();
}
CHECK(data_manager);
auto &cache =
data_manager->template Elements<TRecord>(dba.transaction_id());
new_ = cache.FindNew(address_.gid());
}
DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update";
@ -202,8 +217,15 @@ void RecordAccessor<TRecord>::SendDelta(
DCHECK(!is_local())
<< "Only a delta created on a remote accessor should be sent";
auto result =
db_accessor().db().updates_clients().Update(address().worker_id(), delta);
auto &dba = db_accessor();
distributed::UpdatesRpcClients *updates_clients = nullptr;
// TODO: Replace this with virtual call or some other mechanism.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba.db())) {
updates_clients = &distributed_db->updates_clients();
}
CHECK(updates_clients);
auto result = updates_clients->Update(address().worker_id(), delta);
switch (result) {
case distributed::UpdateResult::DONE:
break;

View File

@ -24,7 +24,7 @@ foreach(test_cpp ${test_type_cpps})
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
# link libraries
target_link_libraries(${target_name} memgraph_lib kvstore_dummy_lib)
target_link_libraries(${target_name} memgraph_lib mg-integrations kvstore_dummy_lib)
# google-benchmark
target_link_libraries(${target_name} benchmark Threads::Threads)

View File

@ -22,7 +22,7 @@ foreach(test_cpp ${test_type_cpps})
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
# link libraries
target_link_libraries(${target_name} memgraph_lib)
target_link_libraries(${target_name} memgraph_lib mg-integrations kvstore_dummy_lib)
# gtest
target_link_libraries(${target_name} gtest gmock gtest_main)

View File

@ -4,6 +4,7 @@
#include <vector>
#include "communication/result_stream_faker.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "glue/conversion.hpp"
#include "query/interpreter.hpp"

View File

@ -6,7 +6,7 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "database/graph_db.hpp"
#include "database/distributed_graph_db.hpp"
#include "query/interpreter.hpp"
#include "query/repl.hpp"
#include "utils/flag_validation.hpp"

View File

@ -1,7 +1,7 @@
#include "gtest/gtest.h"
#include "config.hpp"
#include "database/graph_db.hpp"
#include "database/distributed_graph_db.hpp"
TEST(DatabaseMaster, Instantiate) {
database::Config config;

View File

@ -5,7 +5,7 @@
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include "database/graph_db.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/data_manager.hpp"
#include "distributed/updates_rpc_server.hpp"
@ -163,52 +163,47 @@ class DistributedGraphDbTest : public ::testing::Test {
std::vector<std::unique_ptr<WorkerInThread>> workers_;
};
enum class TestType { SINGLE_NODE, DISTRIBUTED };
// Class that can be used both in distributed and single node tests.
class Cluster {
public:
Cluster(TestType test_type, int num_workers = 0) : test_type_(test_type) {
explicit Cluster(int num_workers = 0) {
using namespace std::literals::chrono_literals;
switch (test_type) {
case TestType::SINGLE_NODE:
master_ = std::make_unique<database::SingleNode>(database::Config{});
break;
case TestType::DISTRIBUTED:
database::Config master_config;
master_config.master_endpoint = {kLocal, 0};
database::Config master_config;
master_config.master_endpoint = {kLocal, 0};
auto master_tmp = std::make_unique<database::Master>(master_config);
auto master_endpoint = master_tmp->endpoint();
master_ = std::move(master_tmp);
auto master_tmp = std::make_unique<database::Master>(master_config);
auto master_endpoint = master_tmp->endpoint();
master_ = std::move(master_tmp);
const auto kInitTime = 200ms;
std::this_thread::sleep_for(kInitTime);
const auto kInitTime = 200ms;
std::this_thread::sleep_for(kInitTime);
auto worker_config = [this, master_endpoint](int worker_id) {
database::Config config;
config.worker_id = worker_id;
config.master_endpoint = master_endpoint;
config.worker_endpoint = {kLocal, 0};
return config;
};
auto worker_config = [this, master_endpoint](int worker_id) {
database::Config config;
config.worker_id = worker_id;
config.master_endpoint = master_endpoint;
config.worker_endpoint = {kLocal, 0};
return config;
};
for (int i = 0; i < num_workers; ++i) {
workers_.emplace_back(
std::make_unique<WorkerInThread>(worker_config(i + 1)));
}
std::this_thread::sleep_for(kInitTime);
break;
for (int i = 0; i < num_workers; ++i) {
workers_.emplace_back(
std::make_unique<WorkerInThread>(worker_config(i + 1)));
}
std::this_thread::sleep_for(kInitTime);
}
Cluster(const Cluster &) = delete;
Cluster(Cluster &&) = delete;
Cluster &operator=(const Cluster &) = delete;
Cluster &operator=(Cluster &&) = delete;
~Cluster() {
auto t = std::thread([this] { master_ = nullptr; });
workers_.clear();
if (t.joinable()) t.join();
}
database::GraphDb *master() { return master_.get(); }
auto *master() { return master_.get(); }
auto workers() {
return iter::imap([](auto &worker) { return worker->db(); }, workers_);
}
@ -221,38 +216,23 @@ class Cluster {
}
void ApplyUpdates(tx::TransactionId tx_id) {
switch (test_type_) {
case TestType::SINGLE_NODE:
break;
case TestType::DISTRIBUTED:
master()->updates_server().Apply(tx_id);
for (auto member : workers()) {
member->updates_server().Apply(tx_id);
}
ClearCache(tx_id);
master()->updates_server().Apply(tx_id);
for (auto member : workers()) {
member->updates_server().Apply(tx_id);
}
ClearCache(tx_id);
}
void AdvanceCommand(tx::TransactionId tx_id) {
switch (test_type_) {
case TestType::SINGLE_NODE: {
database::GraphDbAccessor dba{*master(), tx_id};
dba.AdvanceCommand();
break;
}
case TestType::DISTRIBUTED:
ApplyUpdates(tx_id);
master()->tx_engine().Advance(tx_id);
for (auto worker : workers()) worker->tx_engine().UpdateCommand(tx_id);
ClearCache(tx_id);
break;
}
ApplyUpdates(tx_id);
master()->tx_engine().Advance(tx_id);
for (auto worker : workers()) worker->tx_engine().UpdateCommand(tx_id);
ClearCache(tx_id);
}
private:
const std::string kLocal = "127.0.0.1";
TestType test_type_;
std::unique_ptr<database::GraphDb> master_;
std::unique_ptr<database::Master> master_;
std::vector<std::unique_ptr<WorkerInThread>> workers_;
};

View File

@ -26,8 +26,8 @@
#include "query_plan_common.hpp"
#include "transactions/engine_master.hpp"
using database::GraphDbAccessor;
using namespace distributed;
using namespace database;
using namespace std::literals::chrono_literals;
class DistributedGraphDb : public DistributedGraphDbTest {

View File

@ -306,7 +306,7 @@ TEST_F(DistributedInterpretationTest, AdvanceCommandOnWorkers) {
RunWithDba("UNWIND RANGE(1, 10) as x CREATE (:A {id: x})", dba);
dba.AdvanceCommand();
// Advance commands on workers also.
auto futures = dba.db().pull_clients().NotifyAllTransactionCommandAdvanced(
auto futures = master().pull_clients().NotifyAllTransactionCommandAdvanced(
dba.transaction_id());
for (auto &future : futures) future.wait();

View File

@ -296,8 +296,14 @@ class DistributedDetachDeleteTest : public DistributedGraphDbTest {
accessor.DetachRemoveVertex(v_accessor);
for (auto db_accessor : dba) {
ASSERT_EQ(db_accessor.get().db().updates_server().Apply(
dba[0].get().transaction_id()),
distributed::UpdatesRpcServer *updates_server = nullptr;
auto *db = &db_accessor.get().db();
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(db)) {
updates_server = &distributed_db->updates_server();
}
ASSERT_TRUE(updates_server);
ASSERT_EQ(updates_server->Apply(dba[0].get().transaction_id()),
distributed::UpdateResult::DONE);
}

View File

@ -828,11 +828,30 @@ struct hash<std::pair<int, int>> {
};
} // namespace std
enum class TestType { SINGLE_NODE, DISTRIBUTED };
/** A test fixture for breadth first expansion */
class QueryPlanExpandBfs
: public testing::TestWithParam<std::pair<TestType, int>> {
private:
std::unique_ptr<Cluster> cluster_;
std::unique_ptr<database::SingleNode> single_node_;
database::GraphDb *db_{nullptr};
std::unordered_map<std::pair<int, int>, storage::EdgeAddress> e_;
protected:
QueryPlanExpandBfs() : cluster(GetParam().first, GetParam().second) {}
QueryPlanExpandBfs()
: cluster_(GetParam().first == TestType::DISTRIBUTED
? new Cluster(GetParam().second)
: nullptr),
single_node_(GetParam().first == TestType::DISTRIBUTED
? nullptr
: new database::SingleNode()),
db_([&]() -> database::GraphDb * {
if (cluster_) return cluster_->master();
return single_node_.get();
}()),
dba(*db_) {}
// Worker IDs where vertices are located.
const std::vector<int> vertices = {0, 1, 1, 0, 1, 2};
@ -842,18 +861,14 @@ class QueryPlanExpandBfs
// Style-guide non-conformant name due to PROPERTY_PAIR and PROPERTY_LOOKUP
// macro requirements.
Cluster cluster;
database::GraphDb &db{*cluster.master()};
database::GraphDbAccessor dba{db};
std::pair<std::string, storage::Property> prop = PROPERTY_PAIR("property");
storage::EdgeType edge_type = dba.EdgeType("edge_type");
database::GraphDbAccessor dba;
std::vector<storage::VertexAddress> v;
std::unordered_map<std::pair<int, int>, storage::EdgeAddress> e;
AstStorage storage;
SymbolTable symbol_table;
std::pair<std::string, storage::Property> prop = PROPERTY_PAIR("property");
storage::EdgeType edge_type = dba.EdgeType("edge_type");
// Inner edge and vertex symbols.
// Edge from a to b has `prop` with the value ab (all ints).
Symbol inner_edge = symbol_table.CreateSymbol("inner_edge", true);
@ -879,10 +894,10 @@ class QueryPlanExpandBfs
VertexAccessor to(v[p.second], dba);
auto edge = dba.InsertEdge(from, to, edge_type);
edge.PropsSet(prop.second, p.first * 10 + p.second);
e.emplace(p, edge.GlobalAddress());
e_.emplace(p, edge.GlobalAddress());
}
cluster.AdvanceCommand(dba.transaction_id());
AdvanceCommand(dba.transaction_id());
}
// Defines and performs a breadth-first expansion with the given parameters.
@ -948,6 +963,18 @@ class QueryPlanExpandBfs
}
return true;
}
void ApplyUpdates(tx::TransactionId tx_id) {
if (GetParam().first == TestType::DISTRIBUTED)
cluster_->ApplyUpdates(tx_id);
}
void AdvanceCommand(tx::TransactionId tx_id) {
if (GetParam().first == TestType::DISTRIBUTED)
cluster_->AdvanceCommand(tx_id);
else
database::GraphDbAccessor(*db_, tx_id).AdvanceCommand();
}
};
TEST_P(QueryPlanExpandBfs, Basic) {
@ -1081,32 +1108,32 @@ TEST_P(QueryPlanExpandBfs, GraphState) {
v.push_back(to.GlobalAddress());
dba.InsertEdge(from, to, edge_type);
cluster.ApplyUpdates(dba.transaction_id());
ApplyUpdates(dba.transaction_id());
}
EXPECT_EQ(ExpandSize(GraphView::OLD), 5);
EXPECT_EQ(ExpandSize(GraphView::NEW), 6);
cluster.AdvanceCommand(dba.transaction_id());
AdvanceCommand(dba.transaction_id());
EXPECT_EQ(ExpandSize(GraphView::OLD), 6);
EXPECT_EQ(ExpandSize(GraphView::NEW), 6);
{
v.push_back(dba.InsertVertex().GlobalAddress());
cluster.AdvanceCommand(dba.transaction_id());
AdvanceCommand(dba.transaction_id());
auto from = VertexAccessor(v[4], dba);
auto to = VertexAccessor(v[7], dba);
dba.InsertEdge(from, to, edge_type);
cluster.ApplyUpdates(dba.transaction_id());
ApplyUpdates(dba.transaction_id());
}
EXPECT_EQ(ExpandSize(GraphView::OLD), 6);
EXPECT_EQ(ExpandSize(GraphView::NEW), 7);
cluster.AdvanceCommand(dba.transaction_id());
AdvanceCommand(dba.transaction_id());
EXPECT_EQ(ExpandSize(GraphView::OLD), 7);
EXPECT_EQ(ExpandSize(GraphView::NEW), 7);