Remove DistributedGraphDb

Summary: DistributedGraphDb is no longer needed.

Reviewers: msantl, teon.banek, ipaljak

Reviewed By: msantl, teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1857
This commit is contained in:
Vinko Kasljevic 2019-02-14 08:36:40 +01:00
parent bb052be002
commit 127a67ab13
20 changed files with 83 additions and 145 deletions

View File

@ -46,7 +46,7 @@ namespace {
// RecordAccessor implementation is shared among different RecordAccessors to
// avoid heap allocations. Therefore, we are constructing this implementation in
// each DistributedGraphDb and pass it to DistributedAccessor.
// each GraphDb and pass it to DistributedAccessor.
template <class TRecord>
class DistributedRecordAccessor final {
// These should never be changed, because this implementation may be shared
@ -260,7 +260,7 @@ class DistributedAccessor : public GraphDbAccessor {
DistributedEdgeAccessor *edge_accessor_;
protected:
DistributedAccessor(DistributedGraphDb *db, tx::TransactionId tx_id,
DistributedAccessor(GraphDb *db, tx::TransactionId tx_id,
DistributedVertexAccessor *vertex_accessor,
DistributedEdgeAccessor *edge_accessor)
: GraphDbAccessor(*db, tx_id),
@ -269,7 +269,7 @@ class DistributedAccessor : public GraphDbAccessor {
vertex_accessor_(vertex_accessor),
edge_accessor_(edge_accessor) {}
DistributedAccessor(DistributedGraphDb *db,
DistributedAccessor(GraphDb *db,
DistributedVertexAccessor *vertex_accessor,
DistributedEdgeAccessor *edge_accessor)
: GraphDbAccessor(*db),
@ -497,7 +497,7 @@ class WorkerAccessor final : public DistributedAccessor {
class DistributedRecoveryTransactions
: public durability::RecoveryTransactions {
public:
explicit DistributedRecoveryTransactions(DistributedGraphDb *db) : db_(db) {}
explicit DistributedRecoveryTransactions(GraphDb *db) : db_(db) {}
void Commit(const tx::TransactionId &tx_id) final {
GetAccessor(tx_id)->Commit();
@ -511,7 +511,7 @@ class DistributedRecoveryTransactions
protected:
virtual GraphDbAccessor *GetAccessor(const tx::TransactionId &tx_id) = 0;
DistributedGraphDb *db_;
GraphDb *db_;
std::unordered_map<tx::TransactionId, std::unique_ptr<GraphDbAccessor>>
accessors_;
};
@ -941,14 +941,12 @@ VertexAccessor InsertVertexIntoRemote(
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, PropertyValue> &properties,
std::experimental::optional<int64_t> cypher_id) {
// TODO: Replace this with virtual call or some other mechanism.
auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba->db());
CHECK(distributed_db);
CHECK(worker_id != distributed_db->WorkerId())
auto *db = &dba->db();
CHECK(db);
CHECK(worker_id != db->WorkerId())
<< "Not allowed to call InsertVertexIntoRemote for local worker";
auto *updates_clients = &distributed_db->updates_clients();
auto *data_manager = &distributed_db->data_manager();
auto *updates_clients = &db->updates_clients();
auto *data_manager = &db->data_manager();
CHECK(updates_clients && data_manager);
auto created_vertex_info = updates_clients->CreateVertex(
worker_id, dba->transaction_id(), labels, properties, cypher_id);

View File

@ -5,42 +5,8 @@
#include "database/distributed/graph_db.hpp"
#include "durability/distributed/version.hpp"
namespace distributed {
class BfsRpcServer;
class BfsRpcClients;
class DataRpcServer;
class DataRpcClients;
class PlanDispatcher;
class PlanConsumer;
class PullRpcClients;
class ProduceRpcServer;
class UpdatesRpcServer;
class UpdatesRpcClients;
class DataManager;
class IndexRpcClients;
} // namespace distributed
namespace database {
namespace impl {
class Master;
class Worker;
} // namespace impl
/// Abstract base class for concrete distributed versions of GraphDb
class DistributedGraphDb : public GraphDb {
public:
virtual int WorkerId() const = 0;
virtual std::vector<int> GetWorkerIds() const = 0;
virtual distributed::BfsRpcClients &bfs_subcursor_clients() = 0;
virtual distributed::DataRpcClients &data_clients() = 0;
virtual distributed::UpdatesRpcServer &updates_server() = 0;
virtual distributed::UpdatesRpcClients &updates_clients() = 0;
virtual distributed::DataManager &data_manager() = 0;
};
class Master final : public DistributedGraphDb {
class Master final : public GraphDb {
public:
explicit Master(Config config = Config());
~Master();
@ -88,7 +54,7 @@ class Master final : public DistributedGraphDb {
std::unique_ptr<utils::Scheduler> snapshot_creator_;
};
class Worker final : public DistributedGraphDb {
class Worker final : public GraphDb {
public:
explicit Worker(Config config = Config());
~Worker();

View File

@ -17,7 +17,26 @@
#include "transactions/distributed/engine.hpp"
#include "utils/scheduler.hpp"
namespace distributed {
class BfsRpcServer;
class BfsRpcClients;
class DataRpcServer;
class DataRpcClients;
class PlanDispatcher;
class PlanConsumer;
class PullRpcClients;
class ProduceRpcServer;
class UpdatesRpcServer;
class UpdatesRpcClients;
class DataManager;
class IndexRpcClients;
} // namespace distributed
namespace database {
namespace impl {
class Master;
class Worker;
} // namespace impl
/// Database configuration. Initialized from flags, but modifiable.
struct Config {
@ -106,59 +125,19 @@ class GraphDb {
/// recovery
virtual void ReinitializeStorage() = 0;
virtual int WorkerId() const = 0;
virtual std::vector<int> GetWorkerIds() const = 0;
virtual distributed::BfsRpcClients &bfs_subcursor_clients() = 0;
virtual distributed::DataRpcClients &data_clients() = 0;
virtual distributed::UpdatesRpcServer &updates_server() = 0;
virtual distributed::UpdatesRpcClients &updates_clients() = 0;
virtual distributed::DataManager &data_manager() = 0;
/// When this is false, no new transactions should be created.
bool is_accepting_transactions() const { return is_accepting_transactions_; }
protected:
std::atomic<bool> is_accepting_transactions_{true};
};
namespace impl {
class SingleNode;
} // namespace impl
class SingleNode final : public GraphDb {
public:
explicit SingleNode(Config config = Config());
~SingleNode();
std::unique_ptr<GraphDbAccessor> Access() override;
std::unique_ptr<GraphDbAccessor> Access(tx::TransactionId) override;
Storage &storage() override;
durability::WriteAheadLog &wal() override;
tx::Engine &tx_engine() override;
storage::ConcurrentIdMapper<storage::Label> &label_mapper() override;
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override;
storage::ConcurrentIdMapper<storage::Property> &property_mapper() override;
database::Counters &counters() override;
void CollectGarbage() override;
bool MakeSnapshot(GraphDbAccessor &accessor) override;
void ReinitializeStorage() override;
private:
std::unique_ptr<impl::SingleNode> impl_;
std::unique_ptr<utils::Scheduler> snapshot_creator_;
utils::Scheduler transaction_killer_;
};
class SingleNodeRecoveryTransanctions final
: public durability::RecoveryTransactions {
public:
explicit SingleNodeRecoveryTransanctions(SingleNode *db);
~SingleNodeRecoveryTransanctions();
void Begin(const tx::TransactionId &tx_id) override;
void Abort(const tx::TransactionId &tx_id) override;
void Commit(const tx::TransactionId &tx_id) override;
void Apply(const database::StateDelta &delta) override;
private:
SingleNode *db_;
std::unordered_map<tx::TransactionId, std::unique_ptr<GraphDbAccessor>>
accessors_;
};
} // namespace database

View File

@ -1,12 +1,12 @@
#include "bfs_rpc_clients.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db.hpp"
#include "distributed/bfs_rpc_messages.hpp"
#include "distributed/data_manager.hpp"
namespace distributed {
BfsRpcClients::BfsRpcClients(database::DistributedGraphDb *db,
BfsRpcClients::BfsRpcClients(database::GraphDb *db,
BfsSubcursorStorage *subcursor_storage,
Coordination *coordination,
DataManager *data_manager)

View File

@ -6,7 +6,7 @@
#include "transactions/transaction.hpp"
namespace database {
class DistributedGraphDb;
class GraphDb;
}
namespace distributed {
@ -21,7 +21,7 @@ class DataManager;
/// directly.
class BfsRpcClients {
public:
BfsRpcClients(database::DistributedGraphDb *db,
BfsRpcClients(database::GraphDb *db,
BfsSubcursorStorage *subcursor_storage,
Coordination *coordination, DataManager *data_manager);
@ -63,7 +63,7 @@ class BfsRpcClients {
const std::vector<query::TypedValue> &frame);
private:
database::DistributedGraphDb *db_;
database::GraphDb *db_;
distributed::BfsSubcursorStorage *subcursor_storage_;
distributed::Coordination *coordination_;
distributed::DataManager *data_manager_;

View File

@ -17,7 +17,7 @@ namespace distributed {
/// subcursor storage.
class BfsRpcServer {
public:
BfsRpcServer(database::DistributedGraphDb *db,
BfsRpcServer(database::GraphDb *db,
distributed::Coordination *coordination,
BfsSubcursorStorage *subcursor_storage)
: db_(db), subcursor_storage_(subcursor_storage) {
@ -156,7 +156,7 @@ class BfsRpcServer {
}
private:
database::DistributedGraphDb *db_;
database::GraphDb *db_;
std::mutex lock_;
std::map<tx::TransactionId, std::unique_ptr<database::GraphDbAccessor>>

View File

@ -2,7 +2,7 @@
#include <unordered_map>
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "query/exceptions.hpp"
#include "query/plan/operator.hpp"

View File

@ -2,13 +2,13 @@
#include <memory>
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/data_rpc_messages.hpp"
namespace distributed {
DataRpcServer::DataRpcServer(database::DistributedGraphDb *db,
DataRpcServer::DataRpcServer(database::GraphDb *db,
distributed::Coordination *coordination)
: db_(db) {
coordination->Register<VertexRpc>(

View File

@ -4,7 +4,7 @@
#include "distributed/coordination.hpp"
namespace database {
class DistributedGraphDb;
class GraphDb;
}
namespace distributed {
@ -12,11 +12,11 @@ namespace distributed {
/// Serves this worker's data to others.
class DataRpcServer {
public:
DataRpcServer(database::DistributedGraphDb *db,
DataRpcServer(database::GraphDb *db,
distributed::Coordination *coordination);
private:
database::DistributedGraphDb *db_;
database::GraphDb *db_;
};
} // namespace distributed

View File

@ -4,7 +4,7 @@
#include <unordered_map>
#include <vector>
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "query/exceptions.hpp"
@ -30,7 +30,7 @@ DEFINE_VALIDATED_int32(dgp_max_batch_size, 2000,
namespace distributed::dgp {
Partitioner::Partitioner(database::DistributedGraphDb *db) : db_(db) {}
Partitioner::Partitioner(database::GraphDb *db) : db_(db) {}
std::pair<double, bool> Partitioner::Partition() {
auto dba = db_->Access();

View File

@ -10,7 +10,7 @@
#include "storage/vertex_accessor.hpp"
namespace database {
class DistributedGraphDb;
class GraphDb;
class GraphDbAccessor;
}; // namespace database
@ -49,7 +49,7 @@ class Partitioner {
/// The partitioner needs GraphDb because each partition step is a new
/// database transactions (database accessor has to be created).
/// TODO (buda): Consider passing GraphDbAccessor directly.
explicit Partitioner(database::DistributedGraphDb *db);
explicit Partitioner(database::GraphDb *db);
Partitioner(const Partitioner &other) = delete;
Partitioner(Partitioner &&other) = delete;
@ -83,7 +83,7 @@ class Partitioner {
const VertexAccessor &vertex) const;
private:
database::DistributedGraphDb *db_{nullptr};
database::GraphDb *db_{nullptr};
};
} // namespace distributed::dgp

View File

@ -1,11 +1,11 @@
#include "distributed/dynamic_worker.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db.hpp"
#include "distributed/dynamic_worker_rpc_messages.hpp"
namespace distributed {
DynamicWorkerAddition::DynamicWorkerAddition(database::DistributedGraphDb *db,
DynamicWorkerAddition::DynamicWorkerAddition(database::GraphDb *db,
distributed::Coordination *coordination)
: db_(db), coordination_(coordination) {
coordination_->Register<DynamicWorkerRpc>(

View File

@ -9,20 +9,20 @@
#include "distributed/coordination.hpp"
namespace database {
class DistributedGraphDb;
class GraphDb;
} // namespace database
namespace distributed {
class DynamicWorkerAddition final {
public:
DynamicWorkerAddition(database::DistributedGraphDb *db,
DynamicWorkerAddition(database::GraphDb *db,
distributed::Coordination *coordination);
/// Enable dynamic worker addition.
void Enable();
private:
database::DistributedGraphDb *db_{nullptr};
database::GraphDb *db_{nullptr};
distributed::Coordination *coordination_;
std::atomic<bool> enabled_{false};

View File

@ -18,7 +18,7 @@ cpp<#
(lcp:in-impl
#>cpp
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/data_manager.hpp"
cpp<#)

View File

@ -6,7 +6,7 @@
#include "distributed/dgp/partitioner.hpp"
namespace database {
class DistributedGraphDb;
class GraphDb;
};
namespace distributed {
@ -23,7 +23,7 @@ namespace distributed {
/// step in the same time.
class TokenSharingRpcServer {
public:
TokenSharingRpcServer(database::DistributedGraphDb *db, int worker_id,
TokenSharingRpcServer(database::GraphDb *db, int worker_id,
distributed::Coordination *coordination)
: worker_id_(worker_id), coordination_(coordination), dgp_(db) {
coordination_->Register<distributed::TokenTransferRpc>(

View File

@ -176,7 +176,7 @@ UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Apply() {
return UpdateResult::DONE;
}
UpdatesRpcServer::UpdatesRpcServer(database::DistributedGraphDb *db,
UpdatesRpcServer::UpdatesRpcServer(database::GraphDb *db,
distributed::Coordination *coordination)
: db_(db) {
coordination->Register<UpdateRpc>([this](const auto &req_reader,

View File

@ -34,7 +34,7 @@ class UpdatesRpcServer {
template <typename TRecordAccessor>
class TransactionUpdates {
public:
TransactionUpdates(database::DistributedGraphDb *db,
TransactionUpdates(database::GraphDb *db,
tx::TransactionId tx_id)
: db_accessor_(db->Access(tx_id)) {}
@ -75,7 +75,7 @@ class UpdatesRpcServer {
};
public:
UpdatesRpcServer(database::DistributedGraphDb *db,
UpdatesRpcServer(database::GraphDb *db,
distributed::Coordination *coordination);
/// Applies all existsing updates for the given transaction ID. If there are
@ -88,7 +88,7 @@ class UpdatesRpcServer {
void ClearTransactionalCache(tx::TransactionId oldest_active);
private:
database::DistributedGraphDb *db_;
database::GraphDb *db_;
template <typename TAccessor>
using MapT = ConcurrentMap<tx::TransactionId, TransactionUpdates<TAccessor>>;

View File

@ -1,6 +1,6 @@
#include "query/plan/distributed_ops.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "distributed/pull_produce_rpc_messages.hpp"
#include "distributed/pull_rpc_clients.hpp"
@ -1062,11 +1062,9 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
void InitSubcursors(database::GraphDbAccessor *dba,
const query::SymbolTable &symbol_table,
const EvaluationContext &evaluation_context) {
// TODO: Pass in a DistributedGraphDb.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba->db())) {
bfs_subcursor_clients_ = &distributed_db->bfs_subcursor_clients();
}
auto *db = &dba->db();
bfs_subcursor_clients_ = &db->bfs_subcursor_clients();
CHECK(bfs_subcursor_clients_);
subcursor_ids_ = bfs_subcursor_clients_->CreateBfsSubcursors(
dba, self_.common_.direction, self_.common_.edge_types,
@ -1240,7 +1238,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
};
// Returns a random worker id. Worker ID is obtained from the Db.
int RandomWorkerId(const database::DistributedGraphDb &db) {
int RandomWorkerId(const database::GraphDb &db) {
thread_local std::mt19937 gen_{std::random_device{}()};
thread_local std::uniform_int_distribution<int> rand_;
@ -1254,9 +1252,8 @@ VertexAccessor &CreateVertexOnWorker(int worker_id,
Frame &frame, ExecutionContext &context) {
auto &dba = *context.db_accessor;
auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba.db());
int current_worker_id = distributed_db->WorkerId();
auto *db = &dba.db();
int current_worker_id = db->WorkerId();
if (worker_id == current_worker_id)
return CreateLocalVertex(node_info, &frame, context);
@ -1289,8 +1286,7 @@ class DistributedCreateNodeCursor : public query::plan::Cursor {
DistributedCreateNodeCursor(const DistributedCreateNode *self,
database::GraphDbAccessor *dba)
: input_cursor_(self->input()->MakeCursor(*dba)),
// TODO: Replace this with some other mechanism
db_(dynamic_cast<database::DistributedGraphDb *>(&dba->db())),
db_(&dba->db()),
node_info_(self->node_info_),
on_random_worker_(self->on_random_worker_) {
CHECK(db_);
@ -1314,7 +1310,7 @@ class DistributedCreateNodeCursor : public query::plan::Cursor {
private:
std::unique_ptr<query::plan::Cursor> input_cursor_;
database::DistributedGraphDb *db_{nullptr};
database::GraphDb *db_{nullptr};
NodeCreationInfo node_info_;
bool on_random_worker_{false};
};
@ -1325,8 +1321,7 @@ class DistributedCreateExpandCursor : public query::plan::Cursor {
database::GraphDbAccessor *dba)
: input_cursor_(self->input()->MakeCursor(*dba)),
self_(self),
// TODO: Replace this with some other mechanism
db_(dynamic_cast<database::DistributedGraphDb *>(&dba->db())) {
db_(&dba->db()) {
CHECK(db_);
}
@ -1400,7 +1395,7 @@ class DistributedCreateExpandCursor : public query::plan::Cursor {
private:
std::unique_ptr<query::plan::Cursor> input_cursor_;
const DistributedCreateExpand *self_{nullptr};
database::DistributedGraphDb *db_{nullptr};
database::GraphDb *db_{nullptr};
};
} // namespace

View File

@ -5,7 +5,7 @@
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/data_manager.hpp"
#include "distributed/updates_rpc_server.hpp"

View File

@ -290,7 +290,7 @@ class DistributedDetachDeleteTest : public DistributedGraphDbTest {
std::vector<std::reference_wrapper<database::GraphDbAccessor>> dba{
*dba0, *dba1, *dba2};
std::vector<database::DistributedGraphDb *> dbs{&master(), &worker(1),
std::vector<database::GraphDb *> dbs{&master(), &worker(1),
&worker(2)};
auto &accessor = dba[i].get();