diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 6552d4351..d3c7e9a7f 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -1,5 +1,5 @@ -#include "communication/messaging/distributed.hpp" #include "database/graph_db.hpp" +#include "communication/messaging/distributed.hpp" #include "distributed/coordination_master.hpp" #include "distributed/coordination_worker.hpp" #include "durability/paths.hpp" @@ -18,26 +18,22 @@ using namespace storage; namespace database { namespace impl { -class Base { +class PrivateBase : public GraphDb { public: - explicit Base(const Config &config) : config_(config) {} - virtual ~Base() {} + explicit PrivateBase(const Config &config) : config_(config) {} + virtual ~PrivateBase() {} const Config config_; - virtual Storage &storage() = 0; - virtual StorageGc &storage_gc() = 0; - virtual durability::WriteAheadLog &wal() = 0; - virtual tx::Engine &tx_engine() = 0; - virtual ConcurrentIdMapper<Label> &label_mapper() = 0; - virtual ConcurrentIdMapper<EdgeType> &edge_type_mapper() = 0; - virtual ConcurrentIdMapper<Property> &property_mapper() = 0; - virtual database::Counters &counters() = 0; + Storage &storage() override { return storage_; } + durability::WriteAheadLog &wal() override { return wal_; } + int WorkerId() const override { return config_.worker_id; } - Base(const Base &) = delete; - Base(Base &&) = delete; - Base &operator=(const Base &) = delete; - Base &operator=(Base &&) = delete; + protected: + Storage storage_{config_.worker_id}; + durability::WriteAheadLog wal_{config_.worker_id, + config_.durability_directory, + config_.durability_enabled}; }; template <template <typename TId> class TMapper> @@ -51,49 +47,39 @@ struct TypemapPack { TMapper<Property> property; }; -#define IMPL_GETTERS \ - Storage &storage() override { return storage_; } \ - StorageGc &storage_gc() override { return storage_gc_; } \ - durability::WriteAheadLog &wal() override { return wal_; } \ - tx::Engine &tx_engine() override { return tx_engine_; } \ - ConcurrentIdMapper<Label> &label_mapper() override { \ - return typemap_pack_.label; \ - } \ - ConcurrentIdMapper<EdgeType> &edge_type_mapper() override { \ - return typemap_pack_.edge_type; \ - } \ - ConcurrentIdMapper<Property> &property_mapper() override { \ - return typemap_pack_.property; \ - } \ - database::Counters &counters() override { return counters_; } +#define IMPL_GETTERS \ + tx::Engine &tx_engine() override { return tx_engine_; } \ + ConcurrentIdMapper<Label> &label_mapper() override { \ + return typemap_pack_.label; \ + } \ + ConcurrentIdMapper<EdgeType> &edge_type_mapper() override { \ + return typemap_pack_.edge_type; \ + } \ + ConcurrentIdMapper<Property> &property_mapper() override { \ + return typemap_pack_.property; \ + } \ + database::Counters &counters() override { return counters_; } \ + void CollectGarbage() override { storage_gc_.CollectGarbage(); } -class SingleNode : public Base { +class SingleNode : public PrivateBase { public: - explicit SingleNode(const Config &config) : Base(config) {} + explicit SingleNode(const Config &config) : PrivateBase(config) {} IMPL_GETTERS private: - Storage storage_{0}; - durability::WriteAheadLog wal_{config_.worker_id, - config_.durability_directory, - config_.durability_enabled}; tx::SingleNodeEngine tx_engine_{&wal_}; StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec}; TypemapPack<SingleNodeConcurrentIdMapper> typemap_pack_; database::SingleNodeCounters counters_; }; -class Master : public Base { +class Master : public PrivateBase { public: - explicit Master(const Config &config) : Base(config) {} + explicit Master(const Config &config) : PrivateBase(config) {} IMPL_GETTERS private: communication::messaging::System system_{config_.master_endpoint}; - Storage storage_{0}; - durability::WriteAheadLog wal_{config_.worker_id, - config_.durability_directory, - config_.durability_enabled}; tx::MasterEngine tx_engine_{system_, &wal_}; StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec}; distributed::MasterCoordination coordination{system_}; @@ -101,9 +87,9 @@ class Master : public Base { database::MasterCounters counters_{system_}; }; -class Worker : public Base { +class Worker : public PrivateBase { public: - explicit Worker(const Config &config) : Base(config) {} + explicit Worker(const Config &config) : PrivateBase(config) {} IMPL_GETTERS void WaitForShutdown() { coordination_.WaitForShutdown(); } @@ -112,11 +98,7 @@ class Worker : public Base { distributed::WorkerCoordination coordination_{system_, config_.master_endpoint}; tx::WorkerEngine tx_engine_{system_, config_.master_endpoint}; - Storage storage_{config_.worker_id}; StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec}; - durability::WriteAheadLog wal_{config_.worker_id, - config_.durability_directory, - config_.durability_enabled}; TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{system_, config_.master_endpoint}; database::WorkerCounters counters_{system_, config_.master_endpoint}; @@ -124,16 +106,15 @@ class Worker : public Base { #undef IMPL_GETTERS -} // namespace impl - -GraphDb::GraphDb(std::unique_ptr<impl::Base> impl) : impl_(std::move(impl)) { +PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl) + : impl_(std::move(impl)) { if (impl_->config_.durability_enabled) durability::CheckDurabilityDir(impl_->config_.durability_directory); if (impl_->config_.db_recover_on_startup) - durability::Recover(impl_->config_.durability_directory, *this); + durability::Recover(impl_->config_.durability_directory, *impl_); if (impl_->config_.durability_enabled) { - wal().Enable(); + impl_->wal().Enable(); snapshot_creator_ = std::make_unique<Scheduler>(); snapshot_creator_->Run( std::chrono::seconds(impl_->config_.snapshot_cycle_sec), @@ -141,38 +122,30 @@ GraphDb::GraphDb(std::unique_ptr<impl::Base> impl) : impl_(std::move(impl)) { } } -GraphDb::~GraphDb() { +PublicBase::~PublicBase() { snapshot_creator_.release(); if (impl_->config_.snapshot_on_exit) MakeSnapshot(); } -int GraphDb::WorkerId() const { return impl_->config_.worker_id; } - -Storage &GraphDb::storage() { return impl_->storage(); } - -durability::WriteAheadLog &GraphDb::wal() { return impl_->wal(); } - -tx::Engine &GraphDb::tx_engine() { return impl_->tx_engine(); } - -ConcurrentIdMapper<Label> &GraphDb::label_mapper() { +Storage &PublicBase::storage() { return impl_->storage(); } +durability::WriteAheadLog &PublicBase::wal() { return impl_->wal(); } +tx::Engine &PublicBase::tx_engine() { return impl_->tx_engine(); } +ConcurrentIdMapper<Label> &PublicBase::label_mapper() { return impl_->label_mapper(); } - -ConcurrentIdMapper<EdgeType> &GraphDb::edge_type_mapper() { +ConcurrentIdMapper<EdgeType> &PublicBase::edge_type_mapper() { return impl_->edge_type_mapper(); } - -ConcurrentIdMapper<Property> &GraphDb::property_mapper() { +ConcurrentIdMapper<Property> &PublicBase::property_mapper() { return impl_->property_mapper(); } +database::Counters &PublicBase::counters() { return impl_->counters(); } +void PublicBase::CollectGarbage() { impl_->CollectGarbage(); } +int PublicBase::WorkerId() const { return impl_->WorkerId(); } -database::Counters &GraphDb::counters() { return impl_->counters(); } - -void GraphDb::CollectGarbage() { impl_->storage_gc().CollectGarbage(); } - -void GraphDb::MakeSnapshot() { +void PublicBase::MakeSnapshot() { const bool status = durability::MakeSnapshot( - *this, fs::path(impl_->config_.durability_directory), + *impl_, fs::path(impl_->config_.durability_directory), impl_->config_.snapshot_max_retained); if (status) { LOG(INFO) << "Snapshot created successfully." << std::endl; @@ -180,26 +153,34 @@ void GraphDb::MakeSnapshot() { LOG(ERROR) << "Snapshot creation failed!" << std::endl; } } +} // namespace impl -MasterBase::MasterBase(std::unique_ptr<impl::Base> impl) - : GraphDb(std::move(impl)) { +MasterBase::MasterBase(std::unique_ptr<impl::PrivateBase> impl) + : PublicBase(std::move(impl)) { if (impl_->config_.query_execution_time_sec != -1) { transaction_killer_.Run( std::chrono::seconds(std::max( 1, std::min(5, impl_->config_.query_execution_time_sec / 4))), [this]() { - tx_engine().LocalForEachActiveTransaction([this](tx::Transaction &t) { - if (t.creation_time() + - std::chrono::seconds( - impl_->config_.query_execution_time_sec) < - std::chrono::steady_clock::now()) { - t.set_should_abort(); - }; - }); + impl_->tx_engine().LocalForEachActiveTransaction( + [this](tx::Transaction &t) { + if (t.creation_time() + + std::chrono::seconds( + impl_->config_.query_execution_time_sec) < + std::chrono::steady_clock::now()) { + t.set_should_abort(); + }; + }); }); } } +MasterBase::~MasterBase() { + is_accepting_transactions_ = false; + tx_engine().LocalForEachActiveTransaction( + [](auto &t) { t.set_should_abort(); }); +} + SingleNode::SingleNode(Config config) : MasterBase(std::make_unique<impl::SingleNode>(config)) {} @@ -207,7 +188,7 @@ Master::Master(Config config) : MasterBase(std::make_unique<impl::Master>(config)) {} Worker::Worker(Config config) - : GraphDb(std::make_unique<impl::Worker>(config)) {} + : PublicBase(std::make_unique<impl::Worker>(config)) {} void Worker::WaitForShutdown() { dynamic_cast<impl::Worker *>(impl_.get())->WaitForShutdown(); diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 122b46d4e..c626e640c 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -39,10 +39,6 @@ struct Config { io::network::Endpoint worker_endpoint; }; -namespace impl { -class Base; -} - /** * An abstract base class for a SingleNode/Master/Worker graph db. * @@ -68,38 +64,65 @@ class Base; */ class GraphDb { public: - Storage &storage(); - durability::WriteAheadLog &wal(); - tx::Engine &tx_engine(); - storage::ConcurrentIdMapper<storage::Label> &label_mapper(); - storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper(); - storage::ConcurrentIdMapper<storage::Property> &property_mapper(); - database::Counters &counters(); - void CollectGarbage(); - int WorkerId() const; + GraphDb() {} + virtual ~GraphDb() {} + + virtual Storage &storage() = 0; + virtual durability::WriteAheadLog &wal() = 0; + virtual tx::Engine &tx_engine() = 0; + virtual storage::ConcurrentIdMapper<storage::Label> &label_mapper() = 0; + virtual storage::ConcurrentIdMapper<storage::EdgeType> + &edge_type_mapper() = 0; + virtual storage::ConcurrentIdMapper<storage::Property> &property_mapper() = 0; + virtual database::Counters &counters() = 0; + virtual void CollectGarbage() = 0; + virtual int WorkerId() const = 0; + + GraphDb(const GraphDb &) = delete; + GraphDb(GraphDb &&) = delete; + GraphDb &operator=(const GraphDb &) = delete; + GraphDb &operator=(GraphDb &&) = delete; +}; + +namespace impl { +// Private GraphDb implementations all inherit `PrivateBase`. +// Public GraphDb implementations all inherit `PublicBase`. +class PrivateBase; + +// Base class for all GraphDb implementations exposes to the client programmer. +// Encapsulates an instance of a private implementation of GraphDb and performs +// initialization and cleanup. +class PublicBase : public GraphDb { + public: + Storage &storage() override; + durability::WriteAheadLog &wal() override; + tx::Engine &tx_engine() override; + storage::ConcurrentIdMapper<storage::Label> &label_mapper() override; + storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override; + storage::ConcurrentIdMapper<storage::Property> &property_mapper() override; + database::Counters &counters() override; + void CollectGarbage() override; + int WorkerId() const override; protected: - explicit GraphDb(std::unique_ptr<impl::Base> impl); - virtual ~GraphDb(); + explicit PublicBase(std::unique_ptr<PrivateBase> impl); + ~PublicBase(); - std::unique_ptr<impl::Base> impl_; + std::unique_ptr<PrivateBase> impl_; private: std::unique_ptr<Scheduler> snapshot_creator_; void MakeSnapshot(); }; +} // namespace impl -class MasterBase : public GraphDb { +class MasterBase : public impl::PublicBase { public: - explicit MasterBase(std::unique_ptr<impl::Base> impl); + explicit MasterBase(std::unique_ptr<impl::PrivateBase> impl); bool is_accepting_transactions() const { return is_accepting_transactions_; } - ~MasterBase() { - is_accepting_transactions_ = false; - tx_engine().LocalForEachActiveTransaction( - [](auto &t) { t.set_should_abort(); }); - } + ~MasterBase(); private: /** When this is false, no new transactions should be created. */ @@ -117,7 +140,7 @@ class Master : public MasterBase { explicit Master(Config config = Config()); }; -class Worker : public GraphDb { +class Worker : public impl::PublicBase { public: explicit Worker(Config config = Config()); void WaitForShutdown();