Refactor GraphDb so ::impl classes inherit GraphDb
Summary: A slight insanity here... I realized I will need to create `GraphDbAccessor` instance (which need `&GraphDb`) within some members of `::impl` classes. Within those classes I can pass `this` to those members, if `this` is a valid `GraphDb`. Semantically it really is (at the moment), but heirarchically it wasn't. This diff changes that. `GraphDb` is now only an interface. `PublicBase` is the base for all the public classes, `PrivateBase` for the `::impl` classes. Seems to work. Oh yes, another thing to keep in mind when doing this is that I should avoid calling virtual functions in public classes (the motivation for the double heirarchy). Before this diff the getters weren't virtual, now they are, so I should have made all the appropriate changes in code as well. Buda, was this a task I could have delegated to you or Cula? Reviewers: teon.banek, dgleich, buda Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1120
This commit is contained in:
parent
68a75eeca2
commit
1953f3563f
@ -1,5 +1,5 @@
|
||||
#include "communication/messaging/distributed.hpp"
|
||||
#include "database/graph_db.hpp"
|
||||
#include "communication/messaging/distributed.hpp"
|
||||
#include "distributed/coordination_master.hpp"
|
||||
#include "distributed/coordination_worker.hpp"
|
||||
#include "durability/paths.hpp"
|
||||
@ -18,26 +18,22 @@ using namespace storage;
|
||||
namespace database {
|
||||
namespace impl {
|
||||
|
||||
class Base {
|
||||
class PrivateBase : public GraphDb {
|
||||
public:
|
||||
explicit Base(const Config &config) : config_(config) {}
|
||||
virtual ~Base() {}
|
||||
explicit PrivateBase(const Config &config) : config_(config) {}
|
||||
virtual ~PrivateBase() {}
|
||||
|
||||
const Config config_;
|
||||
|
||||
virtual Storage &storage() = 0;
|
||||
virtual StorageGc &storage_gc() = 0;
|
||||
virtual durability::WriteAheadLog &wal() = 0;
|
||||
virtual tx::Engine &tx_engine() = 0;
|
||||
virtual ConcurrentIdMapper<Label> &label_mapper() = 0;
|
||||
virtual ConcurrentIdMapper<EdgeType> &edge_type_mapper() = 0;
|
||||
virtual ConcurrentIdMapper<Property> &property_mapper() = 0;
|
||||
virtual database::Counters &counters() = 0;
|
||||
Storage &storage() override { return storage_; }
|
||||
durability::WriteAheadLog &wal() override { return wal_; }
|
||||
int WorkerId() const override { return config_.worker_id; }
|
||||
|
||||
Base(const Base &) = delete;
|
||||
Base(Base &&) = delete;
|
||||
Base &operator=(const Base &) = delete;
|
||||
Base &operator=(Base &&) = delete;
|
||||
protected:
|
||||
Storage storage_{config_.worker_id};
|
||||
durability::WriteAheadLog wal_{config_.worker_id,
|
||||
config_.durability_directory,
|
||||
config_.durability_enabled};
|
||||
};
|
||||
|
||||
template <template <typename TId> class TMapper>
|
||||
@ -51,49 +47,39 @@ struct TypemapPack {
|
||||
TMapper<Property> property;
|
||||
};
|
||||
|
||||
#define IMPL_GETTERS \
|
||||
Storage &storage() override { return storage_; } \
|
||||
StorageGc &storage_gc() override { return storage_gc_; } \
|
||||
durability::WriteAheadLog &wal() override { return wal_; } \
|
||||
tx::Engine &tx_engine() override { return tx_engine_; } \
|
||||
ConcurrentIdMapper<Label> &label_mapper() override { \
|
||||
return typemap_pack_.label; \
|
||||
} \
|
||||
ConcurrentIdMapper<EdgeType> &edge_type_mapper() override { \
|
||||
return typemap_pack_.edge_type; \
|
||||
} \
|
||||
ConcurrentIdMapper<Property> &property_mapper() override { \
|
||||
return typemap_pack_.property; \
|
||||
} \
|
||||
database::Counters &counters() override { return counters_; }
|
||||
#define IMPL_GETTERS \
|
||||
tx::Engine &tx_engine() override { return tx_engine_; } \
|
||||
ConcurrentIdMapper<Label> &label_mapper() override { \
|
||||
return typemap_pack_.label; \
|
||||
} \
|
||||
ConcurrentIdMapper<EdgeType> &edge_type_mapper() override { \
|
||||
return typemap_pack_.edge_type; \
|
||||
} \
|
||||
ConcurrentIdMapper<Property> &property_mapper() override { \
|
||||
return typemap_pack_.property; \
|
||||
} \
|
||||
database::Counters &counters() override { return counters_; } \
|
||||
void CollectGarbage() override { storage_gc_.CollectGarbage(); }
|
||||
|
||||
class SingleNode : public Base {
|
||||
class SingleNode : public PrivateBase {
|
||||
public:
|
||||
explicit SingleNode(const Config &config) : Base(config) {}
|
||||
explicit SingleNode(const Config &config) : PrivateBase(config) {}
|
||||
IMPL_GETTERS
|
||||
|
||||
private:
|
||||
Storage storage_{0};
|
||||
durability::WriteAheadLog wal_{config_.worker_id,
|
||||
config_.durability_directory,
|
||||
config_.durability_enabled};
|
||||
tx::SingleNodeEngine tx_engine_{&wal_};
|
||||
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
|
||||
TypemapPack<SingleNodeConcurrentIdMapper> typemap_pack_;
|
||||
database::SingleNodeCounters counters_;
|
||||
};
|
||||
|
||||
class Master : public Base {
|
||||
class Master : public PrivateBase {
|
||||
public:
|
||||
explicit Master(const Config &config) : Base(config) {}
|
||||
explicit Master(const Config &config) : PrivateBase(config) {}
|
||||
IMPL_GETTERS
|
||||
|
||||
private:
|
||||
communication::messaging::System system_{config_.master_endpoint};
|
||||
Storage storage_{0};
|
||||
durability::WriteAheadLog wal_{config_.worker_id,
|
||||
config_.durability_directory,
|
||||
config_.durability_enabled};
|
||||
tx::MasterEngine tx_engine_{system_, &wal_};
|
||||
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
|
||||
distributed::MasterCoordination coordination{system_};
|
||||
@ -101,9 +87,9 @@ class Master : public Base {
|
||||
database::MasterCounters counters_{system_};
|
||||
};
|
||||
|
||||
class Worker : public Base {
|
||||
class Worker : public PrivateBase {
|
||||
public:
|
||||
explicit Worker(const Config &config) : Base(config) {}
|
||||
explicit Worker(const Config &config) : PrivateBase(config) {}
|
||||
IMPL_GETTERS
|
||||
void WaitForShutdown() { coordination_.WaitForShutdown(); }
|
||||
|
||||
@ -112,11 +98,7 @@ class Worker : public Base {
|
||||
distributed::WorkerCoordination coordination_{system_,
|
||||
config_.master_endpoint};
|
||||
tx::WorkerEngine tx_engine_{system_, config_.master_endpoint};
|
||||
Storage storage_{config_.worker_id};
|
||||
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
|
||||
durability::WriteAheadLog wal_{config_.worker_id,
|
||||
config_.durability_directory,
|
||||
config_.durability_enabled};
|
||||
TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{system_,
|
||||
config_.master_endpoint};
|
||||
database::WorkerCounters counters_{system_, config_.master_endpoint};
|
||||
@ -124,16 +106,15 @@ class Worker : public Base {
|
||||
|
||||
#undef IMPL_GETTERS
|
||||
|
||||
} // namespace impl
|
||||
|
||||
GraphDb::GraphDb(std::unique_ptr<impl::Base> impl) : impl_(std::move(impl)) {
|
||||
PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl)
|
||||
: impl_(std::move(impl)) {
|
||||
if (impl_->config_.durability_enabled)
|
||||
durability::CheckDurabilityDir(impl_->config_.durability_directory);
|
||||
|
||||
if (impl_->config_.db_recover_on_startup)
|
||||
durability::Recover(impl_->config_.durability_directory, *this);
|
||||
durability::Recover(impl_->config_.durability_directory, *impl_);
|
||||
if (impl_->config_.durability_enabled) {
|
||||
wal().Enable();
|
||||
impl_->wal().Enable();
|
||||
snapshot_creator_ = std::make_unique<Scheduler>();
|
||||
snapshot_creator_->Run(
|
||||
std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
|
||||
@ -141,38 +122,30 @@ GraphDb::GraphDb(std::unique_ptr<impl::Base> impl) : impl_(std::move(impl)) {
|
||||
}
|
||||
}
|
||||
|
||||
GraphDb::~GraphDb() {
|
||||
PublicBase::~PublicBase() {
|
||||
snapshot_creator_.release();
|
||||
if (impl_->config_.snapshot_on_exit) MakeSnapshot();
|
||||
}
|
||||
|
||||
int GraphDb::WorkerId() const { return impl_->config_.worker_id; }
|
||||
|
||||
Storage &GraphDb::storage() { return impl_->storage(); }
|
||||
|
||||
durability::WriteAheadLog &GraphDb::wal() { return impl_->wal(); }
|
||||
|
||||
tx::Engine &GraphDb::tx_engine() { return impl_->tx_engine(); }
|
||||
|
||||
ConcurrentIdMapper<Label> &GraphDb::label_mapper() {
|
||||
Storage &PublicBase::storage() { return impl_->storage(); }
|
||||
durability::WriteAheadLog &PublicBase::wal() { return impl_->wal(); }
|
||||
tx::Engine &PublicBase::tx_engine() { return impl_->tx_engine(); }
|
||||
ConcurrentIdMapper<Label> &PublicBase::label_mapper() {
|
||||
return impl_->label_mapper();
|
||||
}
|
||||
|
||||
ConcurrentIdMapper<EdgeType> &GraphDb::edge_type_mapper() {
|
||||
ConcurrentIdMapper<EdgeType> &PublicBase::edge_type_mapper() {
|
||||
return impl_->edge_type_mapper();
|
||||
}
|
||||
|
||||
ConcurrentIdMapper<Property> &GraphDb::property_mapper() {
|
||||
ConcurrentIdMapper<Property> &PublicBase::property_mapper() {
|
||||
return impl_->property_mapper();
|
||||
}
|
||||
database::Counters &PublicBase::counters() { return impl_->counters(); }
|
||||
void PublicBase::CollectGarbage() { impl_->CollectGarbage(); }
|
||||
int PublicBase::WorkerId() const { return impl_->WorkerId(); }
|
||||
|
||||
database::Counters &GraphDb::counters() { return impl_->counters(); }
|
||||
|
||||
void GraphDb::CollectGarbage() { impl_->storage_gc().CollectGarbage(); }
|
||||
|
||||
void GraphDb::MakeSnapshot() {
|
||||
void PublicBase::MakeSnapshot() {
|
||||
const bool status = durability::MakeSnapshot(
|
||||
*this, fs::path(impl_->config_.durability_directory),
|
||||
*impl_, fs::path(impl_->config_.durability_directory),
|
||||
impl_->config_.snapshot_max_retained);
|
||||
if (status) {
|
||||
LOG(INFO) << "Snapshot created successfully." << std::endl;
|
||||
@ -180,26 +153,34 @@ void GraphDb::MakeSnapshot() {
|
||||
LOG(ERROR) << "Snapshot creation failed!" << std::endl;
|
||||
}
|
||||
}
|
||||
} // namespace impl
|
||||
|
||||
MasterBase::MasterBase(std::unique_ptr<impl::Base> impl)
|
||||
: GraphDb(std::move(impl)) {
|
||||
MasterBase::MasterBase(std::unique_ptr<impl::PrivateBase> impl)
|
||||
: PublicBase(std::move(impl)) {
|
||||
if (impl_->config_.query_execution_time_sec != -1) {
|
||||
transaction_killer_.Run(
|
||||
std::chrono::seconds(std::max(
|
||||
1, std::min(5, impl_->config_.query_execution_time_sec / 4))),
|
||||
[this]() {
|
||||
tx_engine().LocalForEachActiveTransaction([this](tx::Transaction &t) {
|
||||
if (t.creation_time() +
|
||||
std::chrono::seconds(
|
||||
impl_->config_.query_execution_time_sec) <
|
||||
std::chrono::steady_clock::now()) {
|
||||
t.set_should_abort();
|
||||
};
|
||||
});
|
||||
impl_->tx_engine().LocalForEachActiveTransaction(
|
||||
[this](tx::Transaction &t) {
|
||||
if (t.creation_time() +
|
||||
std::chrono::seconds(
|
||||
impl_->config_.query_execution_time_sec) <
|
||||
std::chrono::steady_clock::now()) {
|
||||
t.set_should_abort();
|
||||
};
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
MasterBase::~MasterBase() {
|
||||
is_accepting_transactions_ = false;
|
||||
tx_engine().LocalForEachActiveTransaction(
|
||||
[](auto &t) { t.set_should_abort(); });
|
||||
}
|
||||
|
||||
SingleNode::SingleNode(Config config)
|
||||
: MasterBase(std::make_unique<impl::SingleNode>(config)) {}
|
||||
|
||||
@ -207,7 +188,7 @@ Master::Master(Config config)
|
||||
: MasterBase(std::make_unique<impl::Master>(config)) {}
|
||||
|
||||
Worker::Worker(Config config)
|
||||
: GraphDb(std::make_unique<impl::Worker>(config)) {}
|
||||
: PublicBase(std::make_unique<impl::Worker>(config)) {}
|
||||
|
||||
void Worker::WaitForShutdown() {
|
||||
dynamic_cast<impl::Worker *>(impl_.get())->WaitForShutdown();
|
||||
|
@ -39,10 +39,6 @@ struct Config {
|
||||
io::network::Endpoint worker_endpoint;
|
||||
};
|
||||
|
||||
namespace impl {
|
||||
class Base;
|
||||
}
|
||||
|
||||
/**
|
||||
* An abstract base class for a SingleNode/Master/Worker graph db.
|
||||
*
|
||||
@ -68,38 +64,65 @@ class Base;
|
||||
*/
|
||||
class GraphDb {
|
||||
public:
|
||||
Storage &storage();
|
||||
durability::WriteAheadLog &wal();
|
||||
tx::Engine &tx_engine();
|
||||
storage::ConcurrentIdMapper<storage::Label> &label_mapper();
|
||||
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper();
|
||||
storage::ConcurrentIdMapper<storage::Property> &property_mapper();
|
||||
database::Counters &counters();
|
||||
void CollectGarbage();
|
||||
int WorkerId() const;
|
||||
GraphDb() {}
|
||||
virtual ~GraphDb() {}
|
||||
|
||||
virtual Storage &storage() = 0;
|
||||
virtual durability::WriteAheadLog &wal() = 0;
|
||||
virtual tx::Engine &tx_engine() = 0;
|
||||
virtual storage::ConcurrentIdMapper<storage::Label> &label_mapper() = 0;
|
||||
virtual storage::ConcurrentIdMapper<storage::EdgeType>
|
||||
&edge_type_mapper() = 0;
|
||||
virtual storage::ConcurrentIdMapper<storage::Property> &property_mapper() = 0;
|
||||
virtual database::Counters &counters() = 0;
|
||||
virtual void CollectGarbage() = 0;
|
||||
virtual int WorkerId() const = 0;
|
||||
|
||||
GraphDb(const GraphDb &) = delete;
|
||||
GraphDb(GraphDb &&) = delete;
|
||||
GraphDb &operator=(const GraphDb &) = delete;
|
||||
GraphDb &operator=(GraphDb &&) = delete;
|
||||
};
|
||||
|
||||
namespace impl {
|
||||
// Private GraphDb implementations all inherit `PrivateBase`.
|
||||
// Public GraphDb implementations all inherit `PublicBase`.
|
||||
class PrivateBase;
|
||||
|
||||
// Base class for all GraphDb implementations exposes to the client programmer.
|
||||
// Encapsulates an instance of a private implementation of GraphDb and performs
|
||||
// initialization and cleanup.
|
||||
class PublicBase : public GraphDb {
|
||||
public:
|
||||
Storage &storage() override;
|
||||
durability::WriteAheadLog &wal() override;
|
||||
tx::Engine &tx_engine() override;
|
||||
storage::ConcurrentIdMapper<storage::Label> &label_mapper() override;
|
||||
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override;
|
||||
storage::ConcurrentIdMapper<storage::Property> &property_mapper() override;
|
||||
database::Counters &counters() override;
|
||||
void CollectGarbage() override;
|
||||
int WorkerId() const override;
|
||||
|
||||
protected:
|
||||
explicit GraphDb(std::unique_ptr<impl::Base> impl);
|
||||
virtual ~GraphDb();
|
||||
explicit PublicBase(std::unique_ptr<PrivateBase> impl);
|
||||
~PublicBase();
|
||||
|
||||
std::unique_ptr<impl::Base> impl_;
|
||||
std::unique_ptr<PrivateBase> impl_;
|
||||
|
||||
private:
|
||||
std::unique_ptr<Scheduler> snapshot_creator_;
|
||||
|
||||
void MakeSnapshot();
|
||||
};
|
||||
} // namespace impl
|
||||
|
||||
class MasterBase : public GraphDb {
|
||||
class MasterBase : public impl::PublicBase {
|
||||
public:
|
||||
explicit MasterBase(std::unique_ptr<impl::Base> impl);
|
||||
explicit MasterBase(std::unique_ptr<impl::PrivateBase> impl);
|
||||
bool is_accepting_transactions() const { return is_accepting_transactions_; }
|
||||
|
||||
~MasterBase() {
|
||||
is_accepting_transactions_ = false;
|
||||
tx_engine().LocalForEachActiveTransaction(
|
||||
[](auto &t) { t.set_should_abort(); });
|
||||
}
|
||||
~MasterBase();
|
||||
|
||||
private:
|
||||
/** When this is false, no new transactions should be created. */
|
||||
@ -117,7 +140,7 @@ class Master : public MasterBase {
|
||||
explicit Master(Config config = Config());
|
||||
};
|
||||
|
||||
class Worker : public GraphDb {
|
||||
class Worker : public impl::PublicBase {
|
||||
public:
|
||||
explicit Worker(Config config = Config());
|
||||
void WaitForShutdown();
|
||||
|
Loading…
Reference in New Issue
Block a user