Two phase commit on cursor destruction

Summary:
When commiting/aborting a transaction in tx master engine, make a two
phase commit to all workers so they can stop all futures and clear
transactional cache.

Reviewers: dgleich, florijan

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1320
This commit is contained in:
Matija Santl 2018-03-21 10:26:43 +01:00
parent 485e1988c3
commit 0bcf2edeae
16 changed files with 175 additions and 54 deletions

View File

@ -10,6 +10,7 @@
#include "distributed/index_rpc_messages.hpp"
#include "distributed/plan_rpc_messages.hpp"
#include "distributed/pull_produce_rpc_messages.hpp"
#include "distributed/transactional_cache_cleaner_rpc_messages.hpp"
#include "distributed/updates_rpc_messages.hpp"
#include "stats/stats_rpc_messages.hpp"
#include "storage/concurrent_id_mapper_rpc_messages.hpp"
@ -115,3 +116,7 @@ BOOST_CLASS_EXPORT(distributed::RemoveEdgeRes);
BOOST_CLASS_EXPORT(distributed::RemoveInEdgeData);
BOOST_CLASS_EXPORT(distributed::RemoveInEdgeReq);
BOOST_CLASS_EXPORT(distributed::RemoveInEdgeRes);
// Transactional Cache Cleaner.
BOOST_CLASS_EXPORT(distributed::WaitOnTransactionEndReq);
BOOST_CLASS_EXPORT(distributed::WaitOnTransactionEndRes);

View File

@ -142,10 +142,7 @@ class SingleNode : public PrivateBase {
class Master : public PrivateBase {
public:
explicit Master(const Config &config) : PrivateBase(config) {
cache_cleaner_.Register(updates_server_);
cache_cleaner_.Register(data_manager_);
}
explicit Master(const Config &config) : PrivateBase(config) {}
GraphDb::Type type() const override {
return GraphDb::Type::DISTRIBUTED_MASTER;
@ -162,10 +159,10 @@ class Master : public PrivateBase {
communication::rpc::Server server_{
config_.master_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
tx::MasterEngine tx_engine_{server_, &wal_};
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
tx::MasterEngine tx_engine_{server_, rpc_worker_clients_, &wal_};
distributed::MasterCoordination coordination_{server_};
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
TypemapPack<MasterConcurrentIdMapper> typemap_pack_{server_};
database::MasterCounters counters_{server_};
distributed::DataRpcServer data_server_{*this, server_};
@ -176,17 +173,14 @@ class Master : public PrivateBase {
distributed::UpdatesRpcServer updates_server_{*this, server_};
distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
distributed::DataManager data_manager_{storage_, data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_};
distributed::TransactionalCacheCleaner cache_cleaner_{
tx_engine_, updates_server_, data_manager_};
};
class Worker : public PrivateBase {
public:
explicit Worker(const Config &config) : PrivateBase(config) {
coordination_.RegisterWorker(config.worker_id);
cache_cleaner_.Register(tx_engine_);
cache_cleaner_.Register(produce_server_);
cache_cleaner_.Register(updates_server_);
cache_cleaner_.Register(data_manager_);
}
GraphDb::Type type() const override {
@ -225,7 +219,8 @@ class Worker : public PrivateBase {
distributed::UpdatesRpcServer updates_server_{*this, server_};
distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
distributed::DataManager data_manager_{storage_, data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_};
distributed::WorkerTransactionalCacheCleaner cache_cleaner_{
tx_engine_, server_, produce_server_, updates_server_, data_manager_};
};
#undef IMPL_GETTERS

View File

@ -36,7 +36,7 @@ class DataManager {
void ClearCacheForSingleTransaction(tx::transaction_id_t tx_id);
/// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::CacheCleaner`.
/// this method is dictated by `distributed::TransactionalCacheCleaner`.
void ClearTransactionalCache(tx::transaction_id_t oldest_active);
private:

View File

@ -13,10 +13,10 @@ ProduceRpcServer::OngoingProduce::OngoingProduce(
query::SymbolTable symbol_table, Parameters parameters,
std::vector<query::Symbol> pull_symbols)
: dba_{db, tx_id},
cursor_(op->MakeCursor(dba_)),
context_(dba_),
pull_symbols_(std::move(pull_symbols)),
frame_(symbol_table.max_position()) {
frame_(symbol_table.max_position()),
cursor_(op->MakeCursor(dba_)) {
context_.symbol_table_ = std::move(symbol_table);
context_.parameters_ = std::move(parameters);
}
@ -108,22 +108,24 @@ ProduceRpcServer::ProduceRpcServer(
});
}
void ProduceRpcServer::ClearTransactionalCache(
tx::transaction_id_t oldest_active) {
auto access = ongoing_produces_.access();
for (auto &kv : access) {
if (kv.first.first < oldest_active) {
access.remove(kv.first);
void ProduceRpcServer::FinishAndClearOngoingProducePlans(
tx::transaction_id_t tx_id) {
std::lock_guard<std::mutex> guard{ongoing_produces_lock_};
for (auto it = ongoing_produces_.begin(); it != ongoing_produces_.end();) {
if (it->first.first == tx_id) {
it = ongoing_produces_.erase(it);
} else {
++it;
}
}
}
ProduceRpcServer::OngoingProduce &ProduceRpcServer::GetOngoingProduce(
const PullReq &req) {
auto access = ongoing_produces_.access();
auto key_pair = std::make_pair(req.tx_id, req.plan_id);
auto found = access.find(key_pair);
if (found != access.end()) {
std::lock_guard<std::mutex> guard{ongoing_produces_lock_};
auto found = ongoing_produces_.find(key_pair);
if (found != ongoing_produces_.end()) {
return found->second;
}
if (db_.type() == database::GraphDb::Type::DISTRIBUTED_WORKER) {
@ -132,8 +134,8 @@ ProduceRpcServer::OngoingProduce &ProduceRpcServer::GetOngoingProduce(
.RunningTransaction(req.tx_id, req.tx_snapshot);
}
auto &plan_pack = plan_consumer_.PlanForId(req.plan_id);
return access
.emplace(key_pair, std::forward_as_tuple(key_pair),
return ongoing_produces_
.emplace(std::piecewise_construct, std::forward_as_tuple(key_pair),
std::forward_as_tuple(db_, req.tx_id, plan_pack.plan,
plan_pack.symbol_table, req.params,
req.symbols))

View File

@ -1,11 +1,12 @@
#pragma once
#include <cstdint>
#include <map>
#include <mutex>
#include <utility>
#include <vector>
#include "communication/rpc/server.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/plan_consumer.hpp"
@ -47,12 +48,12 @@ class ProduceRpcServer {
private:
database::GraphDbAccessor dba_;
std::unique_ptr<query::plan::Cursor> cursor_;
query::Context context_;
std::vector<query::Symbol> pull_symbols_;
query::Frame frame_;
PullState cursor_state_{PullState::CURSOR_IN_PROGRESS};
std::vector<std::vector<query::TypedValue>> accumulation_;
std::unique_ptr<query::plan::Cursor> cursor_;
/// Pulls and returns a single result from the cursor.
std::pair<std::vector<query::TypedValue>, PullState> PullOneFromCursor();
@ -63,16 +64,18 @@ class ProduceRpcServer {
communication::rpc::Server &server,
const distributed::PlanConsumer &plan_consumer);
/// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::TransactionalCacheCleaner`.
void ClearTransactionalCache(tx::transaction_id_t oldest_active);
/// Finish and clear ongoing produces for all plans that are tied to a
/// transaction with tx_id.
void FinishAndClearOngoingProducePlans(tx::transaction_id_t tx_id);
private:
std::mutex ongoing_produces_lock_;
/// Mapping of (tx id, plan id) to OngoingProduce.
std::map<std::pair<tx::transaction_id_t, int64_t>, OngoingProduce>
ongoing_produces_;
database::GraphDb &db_;
communication::rpc::Server &produce_rpc_server_;
const distributed::PlanConsumer &plan_consumer_;
ConcurrentMap<std::pair<tx::transaction_id_t, int64_t>, OngoingProduce>
ongoing_produces_;
tx::Engine &tx_engine_;
/// Gets an ongoing produce for the given pull request. Creates a new one if

View File

@ -7,6 +7,7 @@
#include "communication/rpc/client_pool.hpp"
#include "distributed/coordination.hpp"
#include "distributed/index_rpc_messages.hpp"
#include "distributed/transactional_cache_cleaner_rpc_messages.hpp"
#include "storage/types.hpp"
#include "transactions/transaction.hpp"
@ -98,4 +99,35 @@ class IndexRpcClients {
RpcWorkerClients &clients_;
};
/** Join ongoing produces on all workers.
*
* Sends a RPC request to all workers when a transaction is ending, notifying
* them to end all ongoing produces tied to that transaction.
*/
class OngoingProduceJoinerRpcClients {
public:
OngoingProduceJoinerRpcClients(RpcWorkerClients &clients)
: clients_(clients) {}
void JoinOngoingProduces(tx::transaction_id_t tx_id) {
auto futures = clients_.ExecuteOnWorkers<void>(
0, [tx_id](communication::rpc::ClientPool &client_pool) {
auto result =
client_pool.Call<distributed::WaitOnTransactionEndRpc>(tx_id);
CHECK(result)
<< "[WaitOnTransactionEndRpc] failed to notify that transaction "
<< tx_id << " ended";
});
// We need to wait for all workers to destroy pending futures to avoid using
// already destroyed (released) transaction objects.
for (auto &future : futures) {
future.wait();
}
}
private:
RpcWorkerClients &clients_;
};
} // namespace distributed

View File

@ -3,27 +3,33 @@
#include <functional>
#include <vector>
#include "communication/rpc/server.hpp"
#include "distributed/produce_rpc_server.hpp"
#include "distributed/transactional_cache_cleaner_rpc_messages.hpp"
#include "transactions/engine.hpp"
#include "transactions/engine_worker.hpp"
#include "utils/scheduler.hpp"
namespace distributed {
/// Periodically calls `ClearCache(oldest_transaction)` on all registered
/// functions.
/// Periodically calls `ClearTransactionalCache(oldest_transaction)` on all
/// registered objects.
class TransactionalCacheCleaner {
/// The wait time between two releases of local transaction objects that have
/// expired on the master.
static constexpr std::chrono::seconds kCacheReleasePeriod{1};
public:
TransactionalCacheCleaner(tx::Engine &tx_engine) : tx_engine_(tx_engine) {
template <typename... T>
TransactionalCacheCleaner(tx::Engine &tx_engine, T &... caches)
: tx_engine_(tx_engine) {
Register(caches...);
cache_clearing_scheduler_.Run(
"DistrTxCacheGc", kCacheReleasePeriod, [this]() {
auto oldest_active = tx_engine_.LocalOldestActive();
for (auto &f : functions_) f(oldest_active);
});
"DistrTxCacheGc", kCacheReleasePeriod,
[this]() { this->Clear(tx_engine_.GlobalGcSnapshot().back()); });
}
protected:
/// Registers the given object for transactional cleaning. The object will
/// periodically get it's `ClearCache(tx::transaction_id_t)` method called
/// with the oldest active transaction id. Note that the ONLY guarantee for
@ -37,9 +43,46 @@ class TransactionalCacheCleaner {
}
private:
template <typename TCache, typename... T>
void Register(TCache &cache, T &... caches) {
Register(cache);
Register(caches...);
}
void Clear(tx::transaction_id_t oldest_active) {
for (auto &f : functions_) f(oldest_active);
}
tx::Engine &tx_engine_;
std::vector<std::function<void(tx::transaction_id_t &oldest_active)>>
functions_;
Scheduler cache_clearing_scheduler_;
};
/// Registers a RPC server that listens for `WaitOnTransactionEnd` requests
/// that require all ongoing produces to finish. It also periodically calls
/// `ClearTransactionalCache` on all registered objects.
class WorkerTransactionalCacheCleaner : public TransactionalCacheCleaner {
public:
template <class... T>
WorkerTransactionalCacheCleaner(tx::WorkerEngine &tx_engine,
communication::rpc::Server &server,
ProduceRpcServer &produce_server,
T &... caches)
: TransactionalCacheCleaner(tx_engine, caches...),
rpc_server_(server),
produce_server_(produce_server) {
Register(tx_engine);
rpc_server_.Register<WaitOnTransactionEndRpc>(
[this](const WaitOnTransactionEndReq &req) {
produce_server_.FinishAndClearOngoingProducePlans(req.member);
return std::make_unique<WaitOnTransactionEndRes>();
});
}
private:
communication::rpc::Server &rpc_server_;
ProduceRpcServer &produce_server_;
};
} // namespace distributed

View File

@ -0,0 +1,13 @@
#pragma once
#include "communication/rpc/messages.hpp"
#include "transactions/type.hpp"
namespace distributed {
RPC_SINGLE_MEMBER_MESSAGE(WaitOnTransactionEndReq, tx::transaction_id_t);
RPC_NO_MEMBER_MESSAGE(WaitOnTransactionEndRes);
using WaitOnTransactionEndRpc =
communication::rpc::RequestResponse<WaitOnTransactionEndReq,
WaitOnTransactionEndRes>;
};

View File

@ -335,9 +335,8 @@ UpdateResult UpdatesRpcServer::RemoveEdge(const RemoveEdgeData &data) {
}
template <>
VertexAccessor
UpdatesRpcServer::TransactionUpdates<VertexAccessor>::FindAccessor(
gid::Gid gid) {
VertexAccessor UpdatesRpcServer::TransactionUpdates<
VertexAccessor>::FindAccessor(gid::Gid gid) {
return db_accessor_.FindVertex(gid, false);
}

View File

@ -76,8 +76,8 @@ class UpdatesRpcServer {
/// after applying them, regardless of the result.
UpdateResult Apply(tx::transaction_id_t tx_id);
/// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::CacheCleaner`.
/// Clears the cache of local transactions that are completed. The signature
/// of this method is dictated by `distributed::TransactionalCacheCleaner`.
void ClearTransactionalCache(tx::transaction_id_t oldest_active);
private:

View File

@ -843,11 +843,13 @@ class Expand : public LogicalOperator, public ExpandCommon {
std::experimental::optional<InEdgeIteratorT> in_edges_it_;
std::experimental::optional<OutEdgeT> out_edges_;
std::experimental::optional<OutEdgeIteratorT> out_edges_it_;
// Edges which are being asynchronously fetched from a remote worker.
std::vector<FutureExpand> future_expands_;
// Stores the last frame before we yield the frame for future edge. It needs
// to be restored afterward.
std::vector<TypedValue> last_frame_;
// Edges which are being asynchronously fetched from a remote worker.
// NOTE: This should be destructed first to ensure that no invalid
// references or pointers exists to other objects of this class.
std::vector<FutureExpand> future_expands_;
bool InitEdges(Frame &, Context &);
};

View File

@ -10,8 +10,11 @@
namespace tx {
MasterEngine::MasterEngine(communication::rpc::Server &server,
distributed::RpcWorkerClients &rpc_worker_clients,
durability::WriteAheadLog *wal)
: SingleNodeEngine(wal), rpc_server_(server) {
: SingleNodeEngine(wal),
rpc_server_(server),
ongoing_produce_joiner_(rpc_worker_clients) {
rpc_server_.Register<BeginRpc>([this](const BeginReq &) {
auto tx = Begin();
return std::make_unique<BeginRes>(TxAndSnapshot{tx->id_, tx->snapshot()});
@ -64,8 +67,19 @@ MasterEngine::MasterEngine(communication::rpc::Server &server,
return std::make_unique<EnsureNextIdGreaterRes>();
});
rpc_server_.Register<GlobalLastRpc>([this](const GlobalLastReq &req) {
rpc_server_.Register<GlobalLastRpc>([this](const GlobalLastReq &) {
return std::make_unique<GlobalLastRes>(GlobalLast());
});
}
void MasterEngine::Commit(const Transaction &t) {
ongoing_produce_joiner_.JoinOngoingProduces(t.id_);
SingleNodeEngine::Commit(t);
}
void MasterEngine::Abort(const Transaction &t) {
ongoing_produce_joiner_.JoinOngoingProduces(t.id_);
SingleNodeEngine::Abort(t);
}
} // namespace tx

View File

@ -1,6 +1,7 @@
#pragma once
#include "communication/rpc/server.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "transactions/engine_single_node.hpp"
namespace tx {
@ -10,13 +11,20 @@ namespace tx {
class MasterEngine : public SingleNodeEngine {
public:
/**
* @param server - Required. Used for rpc::Server construction.
* @param rpc_worker_clients - Required. Used for
* OngoingProduceJoinerRpcClients construction.
* @param wal - Optional. If present, the Engine will write tx
* Begin/Commit/Abort atomically (while under lock).
*/
MasterEngine(communication::rpc::Server &server,
distributed::RpcWorkerClients &rpc_worker_clients,
durability::WriteAheadLog *wal = nullptr);
void Commit(const Transaction &t) override;
void Abort(const Transaction &t) override;
private:
communication::rpc::Server &rpc_server_;
distributed::OngoingProduceJoinerRpcClients ongoing_produce_joiner_;
};
} // namespace tx

View File

@ -7,8 +7,8 @@
#include <glog/logging.h>
#include "database/graph_db.hpp"
#include "query/repl.hpp"
#include "query/interpreter.hpp"
#include "query/repl.hpp"
#include "utils/flag_validation.hpp"
DEFINE_VALIDATED_int32(worker_count, 1,
@ -56,6 +56,6 @@ int main(int argc, char *argv[]) {
// Start the REPL
query::Repl(*master);
master.release();
master = nullptr;
return 0;
}

View File

@ -58,8 +58,9 @@ class DistributedGraphDbTest : public ::testing::Test {
void TearDown() override {
// Kill master first because it will expect a shutdown response from the
// workers.
master_ = nullptr;
auto t = std::thread([this]() { master_ = nullptr; });
for (int i = kWorkerCount - 1; i >= 0; --i) workers_[i] = nullptr;
if (t.joinable()) t.join();
}
database::Master &master() { return *master_; }

View File

@ -6,6 +6,7 @@
#include "gtest/gtest.h"
#include "communication/rpc/server.hpp"
#include "distributed/coordination_master.hpp"
#include "io/network/endpoint.hpp"
#include "transactions/engine_master.hpp"
#include "transactions/engine_rpc_messages.hpp"
@ -13,13 +14,17 @@
using namespace tx;
using namespace communication::rpc;
using namespace distributed;
class WorkerEngineTest : public testing::Test {
protected:
const std::string local{"127.0.0.1"};
Server master_server_{{local, 0}};
MasterEngine master_{master_server_};
MasterCoordination master_coordination_{master_server_};
RpcWorkerClients rpc_worker_clients_{master_coordination_};
MasterEngine master_{master_server_, rpc_worker_clients_};
ClientPool master_client_pool{master_server_.endpoint()};
WorkerEngine worker_{master_client_pool};
@ -140,4 +145,3 @@ TEST_F(WorkerEngineTest, GlobalNext) {
EXPECT_EQ(master_.LocalLast(), worker_.GlobalLast());
EXPECT_EQ(worker_.GlobalLast(), tx->id_);
}