Split database/counters into single node and distributed

Reviewers: msantl, vkasljevic, mferencevic

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1580
This commit is contained in:
Teon Banek 2018-08-31 10:32:53 +02:00
parent ceffaf3d85
commit 5c69ae8a0c
12 changed files with 116 additions and 64 deletions

View File

@ -14,7 +14,7 @@ add_subdirectory(auth)
set(memgraph_src_files
data_structures/concurrent/skiplist_gc.cpp
database/config.cpp
database/counters.cpp
database/distributed_counters.cpp
database/distributed_graph_db.cpp
database/graph_db.cpp
database/graph_db_accessor.cpp

View File

@ -1,13 +1,9 @@
/** @file */
#pragma once
#include <chrono>
#include <cstdint>
#include <string>
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
namespace database {
/** A set of counter that are guaranteed to produce unique, consecutive values
@ -31,35 +27,4 @@ class Counters {
virtual void Set(const std::string &name, int64_t values) = 0;
};
/** Implementation for the single-node memgraph */
class SingleNodeCounters : public Counters {
public:
int64_t Get(const std::string &name) override;
void Set(const std::string &name, int64_t value) override;
private:
ConcurrentMap<std::string, std::atomic<int64_t>> counters_;
};
/** Implementation for distributed master. */
class MasterCounters : public SingleNodeCounters {
public:
explicit MasterCounters(communication::rpc::Server &server);
private:
communication::rpc::Server &rpc_server_;
};
/** Implementation for distributed worker. */
class WorkerCounters : public Counters {
public:
explicit WorkerCounters(communication::rpc::ClientPool &master_client_pool);
int64_t Get(const std::string &name) override;
void Set(const std::string &name, int64_t value) override;
private:
communication::rpc::ClientPool &master_client_pool_;
};
} // namespace database

View File

@ -1,47 +1,49 @@
#include "database/counters.hpp"
#include "database/distributed_counters.hpp"
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "database/counters_rpc_messages.hpp"
namespace database {
int64_t SingleNodeCounters::Get(const std::string &name) {
return counters_.access()
.emplace(name, std::make_tuple(name), std::make_tuple(0))
.first->second.fetch_add(1);
}
void SingleNodeCounters::Set(const std::string &name, int64_t value) {
auto name_counter_pair = counters_.access().emplace(
name, std::make_tuple(name), std::make_tuple(value));
if (!name_counter_pair.second) name_counter_pair.first->second.store(value);
}
MasterCounters::MasterCounters(communication::rpc::Server &server)
MasterCounters::MasterCounters(communication::rpc::Server *server)
: rpc_server_(server) {
rpc_server_.Register<CountersGetRpc>(
rpc_server_->Register<CountersGetRpc>(
[this](const auto &req_reader, auto *res_builder) {
CountersGetRes res(Get(req_reader.getName()));
res.Save(res_builder);
});
rpc_server_.Register<CountersSetRpc>(
rpc_server_->Register<CountersSetRpc>(
[this](const auto &req_reader, auto *res_builder) {
Set(req_reader.getName(), req_reader.getValue());
return std::make_unique<CountersSetRes>();
});
}
int64_t MasterCounters::Get(const std::string &name) {
return counters_.access()
.emplace(name, std::make_tuple(name), std::make_tuple(0))
.first->second.fetch_add(1);
}
void MasterCounters::Set(const std::string &name, int64_t value) {
auto name_counter_pair = counters_.access().emplace(
name, std::make_tuple(name), std::make_tuple(value));
if (!name_counter_pair.second) name_counter_pair.first->second.store(value);
}
WorkerCounters::WorkerCounters(
communication::rpc::ClientPool &master_client_pool)
communication::rpc::ClientPool *master_client_pool)
: master_client_pool_(master_client_pool) {}
int64_t WorkerCounters::Get(const std::string &name) {
auto response = master_client_pool_.Call<CountersGetRpc>(name);
auto response = master_client_pool_->Call<CountersGetRpc>(name);
CHECK(response) << "CountersGetRpc failed";
return response->value;
}
void WorkerCounters::Set(const std::string &name, int64_t value) {
auto response = master_client_pool_.Call<CountersSetRpc>(name, value);
auto response = master_client_pool_->Call<CountersSetRpc>(name, value);
CHECK(response) << "CountersSetRpc failed";
}

View File

@ -0,0 +1,43 @@
/// @file
#pragma once
#include <atomic>
#include <cstdint>
#include <string>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/counters.hpp"
namespace communication::rpc {
class Server;
class ClientPool;
} // namespace communication::rpc
namespace database {
/// Implementation for distributed master
class MasterCounters : public Counters {
public:
explicit MasterCounters(communication::rpc::Server *server);
int64_t Get(const std::string &name) override;
void Set(const std::string &name, int64_t value) override;
private:
communication::rpc::Server *rpc_server_;
ConcurrentMap<std::string, std::atomic<int64_t>> counters_;
};
/// Implementation for distributed worker
class WorkerCounters : public Counters {
public:
explicit WorkerCounters(communication::rpc::ClientPool *master_client_pool);
int64_t Get(const std::string &name) override;
void Set(const std::string &name, int64_t value) override;
private:
communication::rpc::ClientPool *master_client_pool_;
};
} // namespace database

View File

@ -1,5 +1,6 @@
#include "database/distributed_graph_db.hpp"
#include "database/distributed_counters.hpp"
#include "database/storage_gc_master.hpp"
#include "database/storage_gc_worker.hpp"
#include "distributed/bfs_rpc_clients.hpp"
@ -563,7 +564,7 @@ class Master {
*storage_, tx_engine_, config_.gc_cycle_sec, server_, coordination_);
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
TypemapPack<storage::MasterConcurrentIdMapper> typemap_pack_{server_};
database::MasterCounters counters_{server_};
database::MasterCounters counters_{&server_};
distributed::BfsSubcursorStorage subcursor_storage_{self_,
&bfs_subcursor_clients_};
distributed::BfsRpcServer bfs_subcursor_server_{self_, &server_,
@ -872,7 +873,7 @@ class Worker {
rpc_worker_clients_.GetClientPool(0), config_.worker_id);
TypemapPack<storage::WorkerConcurrentIdMapper> typemap_pack_{
rpc_worker_clients_.GetClientPool(0)};
database::WorkerCounters counters_{rpc_worker_clients_.GetClientPool(0)};
database::WorkerCounters counters_{&rpc_worker_clients_.GetClientPool(0)};
distributed::BfsSubcursorStorage subcursor_storage_{self_,
&bfs_subcursor_clients_};
distributed::BfsRpcServer bfs_subcursor_server_{self_, &server_,

View File

@ -5,6 +5,7 @@
#include <glog/logging.h>
#include "database/graph_db_accessor.hpp"
#include "database/single_node_counters.hpp"
#include "database/storage_gc_single_node.hpp"
#include "durability/paths.hpp"
#include "durability/recovery.hpp"

View File

@ -0,0 +1,32 @@
/// @file
#pragma once
#include <atomic>
#include <cstdint>
#include <string>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/counters.hpp"
namespace database {
/// Implementation for the single-node memgraph
class SingleNodeCounters : public Counters {
public:
int64_t Get(const std::string &name) override {
return counters_.access()
.emplace(name, std::make_tuple(name), std::make_tuple(0))
.first->second.fetch_add(1);
}
void Set(const std::string &name, int64_t value) override {
auto name_counter_pair = counters_.access().emplace(
name, std::make_tuple(name), std::make_tuple(value));
if (!name_counter_pair.second) name_counter_pair.first->second.store(value);
}
private:
ConcurrentMap<std::string, std::atomic<int64_t>> counters_;
};
} // namespace database

View File

@ -1,7 +1,9 @@
/// @file
#pragma once
#include <mutex>
#include "communication/rpc/server.hpp"
#include "database/storage_gc.hpp"
#include "distributed/coordination_master.hpp"
#include "distributed/storage_gc_rpc_messages.hpp"

View File

@ -1,7 +1,9 @@
#include "distributed/index_rpc_server.hpp"
#include "communication/rpc/server.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/index_rpc_messages.hpp"
#include "distributed/index_rpc_server.hpp"
namespace distributed {

View File

@ -9,6 +9,7 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/server.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db.hpp"
#include "integrations/kafka/exceptions.hpp"

View File

@ -1,3 +1,4 @@
/// @file
#pragma once
#include <csignal>
@ -10,6 +11,7 @@
#include "auth/auth.hpp"
#include "communication/bolt/v1/session.hpp"
#include "communication/session.hpp"
#include "distributed/pull_rpc_clients.hpp"
#include "query/interpreter.hpp"
#include "query/transaction_engine.hpp"

View File

@ -1,17 +1,18 @@
#include "gtest/gtest.h"
#include <gtest/gtest.h>
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "database/counters.hpp"
#include "database/distributed_counters.hpp"
const std::string kLocal = "127.0.0.1";
TEST(CountersDistributed, All) {
communication::rpc::Server master_server({kLocal, 0});
database::MasterCounters master(master_server);
database::MasterCounters master(&master_server);
communication::rpc::ClientPool master_client_pool(master_server.endpoint());
database::WorkerCounters w1(master_client_pool);
database::WorkerCounters w2(master_client_pool);
database::WorkerCounters w1(&master_client_pool);
database::WorkerCounters w2(&master_client_pool);
EXPECT_EQ(w1.Get("a"), 0);
EXPECT_EQ(w1.Get("a"), 1);