memgraph/tests/unit/transaction_engine_single_node.cpp
Matija Santl 800db5058e Add blocking transactions for index creation
Summary:
Blocking transaction has the ability to stop the transaction engine from
starting new transactions (regular or blocking) and to wait all other active
transactions to finish (to become non active, committed or aborted). One thing
that blocking transactions support is defining the parent transaction which
does not need to end in order for the blocking one to start. This is because of
a use case where we start nested transactions.

One could thing we should build indexes inside those blocking transactions. This
is true and I wanted to implement this, but this would require some digging in
the interpreter which I didn't want to do in this change.

Reviewers: mferencevic, vkasljevic, teon.banek

Reviewed By: mferencevic, teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1695
2018-10-24 16:31:50 +02:00

154 lines
4.1 KiB
C++

#include "gtest/gtest.h"
#include <experimental/optional>
#include <thread>
#include <vector>
#include "data_structures/concurrent/skiplist.hpp"
#include "transactions/single_node/engine.hpp"
#include "transactions/transaction.hpp"
using namespace tx;
TEST(Engine, GcSnapshot) {
Engine engine;
ASSERT_EQ(engine.GlobalGcSnapshot(), Snapshot({1}));
std::vector<Transaction *> transactions;
// create transactions and check the GC snapshot
for (int i = 0; i < 5; ++i) {
transactions.push_back(engine.Begin());
EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1}));
}
// commit transactions in the middle, expect
// the GcSnapshot did not change
engine.Commit(*transactions[1]);
EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1}));
engine.Commit(*transactions[2]);
EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1}));
// have the first three transactions committed
engine.Commit(*transactions[0]);
EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1, 2, 3, 4}));
// commit all
engine.Commit(*transactions[3]);
engine.Commit(*transactions[4]);
EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({6}));
}
TEST(Engine, Advance) {
Engine engine;
auto t0 = engine.Begin();
auto t1 = engine.Begin();
EXPECT_EQ(t0->cid(), 1);
engine.Advance(t0->id_);
EXPECT_EQ(t0->cid(), 2);
engine.Advance(t0->id_);
EXPECT_EQ(t0->cid(), 3);
EXPECT_EQ(t1->cid(), 1);
}
TEST(Engine, ConcurrentBegin) {
Engine engine;
std::vector<std::thread> threads;
SkipList<TransactionId> tx_ids;
for (int i = 0; i < 10; ++i) {
threads.emplace_back([&engine, accessor = tx_ids.access() ]() mutable {
for (int j = 0; j < 100; ++j) {
auto t = engine.Begin();
accessor.insert(t->id_);
}
});
}
for (auto &t : threads) t.join();
EXPECT_EQ(tx_ids.access().size(), 1000);
}
TEST(Engine, RunningTransaction) {
Engine engine;
auto t0 = engine.Begin();
auto t1 = engine.Begin();
EXPECT_EQ(t0, engine.RunningTransaction(t0->id_));
EXPECT_NE(t1, engine.RunningTransaction(t0->id_));
EXPECT_EQ(t1, engine.RunningTransaction(t1->id_));
}
TEST(Engine, EnsureTxIdGreater) {
Engine engine;
ASSERT_LE(engine.Begin()->id_, 40);
engine.EnsureNextIdGreater(42);
EXPECT_EQ(engine.Begin()->id_, 43);
}
TEST(Engine, BlockingTransaction) {
Engine engine;
std::vector<std::thread> threads;
std::atomic<bool> finished{false};
std::atomic<bool> blocking_started{false};
std::atomic<bool> blocking_finished{false};
std::atomic<int> tx_counter{0};
for (int i = 0; i < 10; ++i) {
threads.emplace_back([&engine, &tx_counter, &finished]() mutable {
auto t = engine.Begin();
tx_counter++;
while (!finished.load()) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
engine.Commit(*t);
});
}
// Wait for all transactions to start.
do {
std::this_thread::sleep_for(std::chrono::microseconds(100));
} while (tx_counter.load() < 10);
threads.emplace_back([&engine, &blocking_started, &blocking_finished]() {
// This should block until other transactions end.
blocking_started.store(true);
auto t = engine.BeginBlocking(std::experimental::nullopt);
engine.Commit(*t);
blocking_finished.store(true);
});
EXPECT_FALSE(finished.load());
EXPECT_FALSE(blocking_finished.load());
EXPECT_EQ(tx_counter.load(), 10);
// Make sure the blocking transaction thread kicked off.
do {
std::this_thread::sleep_for(std::chrono::microseconds(100));
} while (!blocking_started.load());
// Make sure we can't start any new transaction
EXPECT_THROW(engine.Begin(), TransactionEngineError);
EXPECT_THROW(engine.BeginBlocking(std::experimental::nullopt), TransactionEngineError);
// Release regular transactions. This will cause the blocking transaction to
// end also.
finished.store(true);
for (auto &t : threads) {
if (t.joinable()) {
t.join();
}
}
EXPECT_TRUE(blocking_finished.load());
// Make sure we can start transactions now.
{
auto t = engine.Begin();
EXPECT_NE(t, nullptr);
engine.Commit(*t);
}
{
auto t = engine.BeginBlocking(std::experimental::nullopt);
EXPECT_NE(t, nullptr);
engine.Commit(*t);
}
}