Remove tx::Engine::GlobalIsActive

Summary:
Remove a method in tx::Engine whose results can be obtained from commit
log info (also guaranteed to be globally correct in distributed).

Reviewers: dgleich

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1240
This commit is contained in:
florijan 2018-02-26 11:06:01 +01:00
parent dc21511e6e
commit c8dc07ad0e
10 changed files with 4 additions and 40 deletions

View File

@ -45,8 +45,6 @@ BOOST_CLASS_EXPORT(tx::GcSnapshotReq);
BOOST_CLASS_EXPORT(tx::ClogInfoReq); BOOST_CLASS_EXPORT(tx::ClogInfoReq);
BOOST_CLASS_EXPORT(tx::ClogInfoRes); BOOST_CLASS_EXPORT(tx::ClogInfoRes);
BOOST_CLASS_EXPORT(tx::ActiveTransactionsReq); BOOST_CLASS_EXPORT(tx::ActiveTransactionsReq);
BOOST_CLASS_EXPORT(tx::IsActiveReq);
BOOST_CLASS_EXPORT(tx::IsActiveRes);
// Distributed coordination. // Distributed coordination.
BOOST_CLASS_EXPORT(distributed::RegisterWorkerReq); BOOST_CLASS_EXPORT(distributed::RegisterWorkerReq);

View File

@ -190,7 +190,7 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
db_.storage().index_build_tx_in_progress_.access(); db_.storage().index_build_tx_in_progress_.access();
for (auto id : wait_transactions) { for (auto id : wait_transactions) {
if (active_index_creation_transactions.contains(id)) continue; if (active_index_creation_transactions.contains(id)) continue;
while (transaction_.engine_.GlobalIsActive(id)) { while (transaction_.engine_.Info(id).is_active()) {
// Active index creation set could only now start containing that id, // Active index creation set could only now start containing that id,
// since that thread could have not written to the set set and to avoid // since that thread could have not written to the set set and to avoid
// dead-lock we need to make sure we keep track of that // dead-lock we need to make sure we keep track of that

View File

@ -71,9 +71,6 @@ class Engine {
/** Returns active transactions. */ /** Returns active transactions. */
virtual Snapshot GlobalActiveTransactions() = 0; virtual Snapshot GlobalActiveTransactions() = 0;
/** Returns true if the transaction with the given ID is currently active. */
virtual bool GlobalIsActive(transaction_id_t tx) const = 0;
/** Returns the ID of last locally known transaction. */ /** Returns the ID of last locally known transaction. */
virtual tx::transaction_id_t LocalLast() const = 0; virtual tx::transaction_id_t LocalLast() const = 0;

View File

@ -57,9 +57,5 @@ MasterEngine::MasterEngine(communication::rpc::Server &server,
[this](const communication::rpc::Message &) { [this](const communication::rpc::Message &) {
return std::make_unique<SnapshotRes>(GlobalActiveTransactions()); return std::make_unique<SnapshotRes>(GlobalActiveTransactions());
}); });
rpc_server_.Register<IsActiveRpc>([this](const IsActiveReq &req) {
return std::make_unique<IsActiveRes>(GlobalIsActive(req.member));
});
} }
} // namespace tx } // namespace tx

View File

@ -57,9 +57,4 @@ RPC_NO_MEMBER_MESSAGE(ActiveTransactionsReq)
using ActiveTransactionsRpc = using ActiveTransactionsRpc =
communication::rpc::RequestResponse<ActiveTransactionsReq, SnapshotRes>; communication::rpc::RequestResponse<ActiveTransactionsReq, SnapshotRes>;
RPC_SINGLE_MEMBER_MESSAGE(IsActiveReq, transaction_id_t)
RPC_SINGLE_MEMBER_MESSAGE(IsActiveRes, bool)
using IsActiveRpc =
communication::rpc::RequestResponse<IsActiveReq, IsActiveRes>;
} // namespace tx } // namespace tx

View File

@ -103,10 +103,6 @@ Snapshot SingleNodeEngine::GlobalActiveTransactions() {
return active_transactions; return active_transactions;
} }
bool SingleNodeEngine::GlobalIsActive(transaction_id_t tx) const {
return clog_.is_active(tx);
}
tx::transaction_id_t SingleNodeEngine::LocalLast() const { tx::transaction_id_t SingleNodeEngine::LocalLast() const {
return counter_.load(); return counter_.load();
} }

View File

@ -37,7 +37,6 @@ class SingleNodeEngine : public Engine {
CommitLog::Info Info(transaction_id_t tx) const override; CommitLog::Info Info(transaction_id_t tx) const override;
Snapshot GlobalGcSnapshot() override; Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override; Snapshot GlobalActiveTransactions() override;
bool GlobalIsActive(transaction_id_t tx) const override;
tx::transaction_id_t LocalLast() const override; tx::transaction_id_t LocalLast() const override;
void LocalForEachActiveTransaction( void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override; std::function<void(Transaction &)> f) override;

View File

@ -107,10 +107,6 @@ Snapshot WorkerEngine::GlobalActiveTransactions() {
return std::move(rpc_client_pool_.Call<ActiveTransactionsRpc>()->member); return std::move(rpc_client_pool_.Call<ActiveTransactionsRpc>()->member);
} }
bool WorkerEngine::GlobalIsActive(transaction_id_t tid) const {
return rpc_client_pool_.Call<IsActiveRpc>(tid)->member;
}
transaction_id_t WorkerEngine::LocalLast() const { return local_last_; } transaction_id_t WorkerEngine::LocalLast() const { return local_last_; }
void WorkerEngine::LocalForEachActiveTransaction( void WorkerEngine::LocalForEachActiveTransaction(

View File

@ -34,7 +34,6 @@ class WorkerEngine : public Engine {
CommitLog::Info Info(transaction_id_t tid) const override; CommitLog::Info Info(transaction_id_t tid) const override;
Snapshot GlobalGcSnapshot() override; Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override; Snapshot GlobalActiveTransactions() override;
bool GlobalIsActive(transaction_id_t tid) const override;
transaction_id_t LocalLast() const override; transaction_id_t LocalLast() const override;
void LocalForEachActiveTransaction( void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override; std::function<void(Transaction &)> f) override;

View File

@ -100,19 +100,6 @@ TEST_F(WorkerEngineTest, GlobalActiveTransactions) {
EXPECT_EQ(worker_.GlobalActiveTransactions(), tx::Snapshot({2, 4})); EXPECT_EQ(worker_.GlobalActiveTransactions(), tx::Snapshot({2, 4}));
} }
TEST_F(WorkerEngineTest, GlobalIsActive) {
auto *tx_1 = master_.Begin();
master_.Begin();
auto *tx_3 = master_.Begin();
master_.Begin();
master_.Commit(*tx_1);
master_.Abort(*tx_3);
EXPECT_FALSE(worker_.GlobalIsActive(1));
EXPECT_TRUE(worker_.GlobalIsActive(2));
EXPECT_FALSE(worker_.GlobalIsActive(3));
EXPECT_TRUE(worker_.GlobalIsActive(4));
}
TEST_F(WorkerEngineTest, LocalLast) { TEST_F(WorkerEngineTest, LocalLast) {
master_.Begin(); master_.Begin();
EXPECT_EQ(worker_.LocalLast(), 0); EXPECT_EQ(worker_.LocalLast(), 0);
@ -143,9 +130,10 @@ TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) {
TEST_F(WorkerEngineTest, TxEndListener) { TEST_F(WorkerEngineTest, TxEndListener) {
std::atomic<int> has_expired{0}; std::atomic<int> has_expired{0};
TxEndListener worker_end_listner{ TxEndListener worker_end_listner{
worker_, [&has_expired](transaction_id_t tid) { worker_, [&has_expired](transaction_id_t tid) {
std::cout << "asdasdadas: " << tid << std::endl; std::cout << "asdasdadas: " << tid << std::endl;
++has_expired; }}; ++has_expired;
}};
auto sleep_period = auto sleep_period =
WorkerEngine::kCacheReleasePeriod + std::chrono::milliseconds(200); WorkerEngine::kCacheReleasePeriod + std::chrono::milliseconds(200);