Allow concurrent index creation
Summary: Update tests Reviewers: florijan, buda Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1012
This commit is contained in:
parent
4f55d5d78b
commit
2b2de245d1
@ -390,11 +390,6 @@ Currently, once an index is created it cannot be deleted. This feature will be
|
|||||||
implemented very soon. The expected syntax for removing an index will be `DROP
|
implemented very soon. The expected syntax for removing an index will be `DROP
|
||||||
INDEX ON :Label(property)`.
|
INDEX ON :Label(property)`.
|
||||||
|
|
||||||
Note that Memgraph does not support concurrent index building. Attempting to
|
|
||||||
build an index from two concurrent transactions is likely to result in an
|
|
||||||
error (reported to the client) for one of those transactions, regardless of
|
|
||||||
index `label` and `property`. Do not create indices concurrently.
|
|
||||||
|
|
||||||
### Other Features
|
### Other Features
|
||||||
|
|
||||||
The following sections describe some of the other supported features.
|
The following sections describe some of the other supported features.
|
||||||
|
@ -130,12 +130,9 @@ class GraphDb {
|
|||||||
LabelPropertyIndex label_property_index_;
|
LabelPropertyIndex label_property_index_;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flag indicating if index building is in progress. Memgraph does not support
|
* Set of transactions ids which are building indexes currently
|
||||||
* concurrent index builds on the same database (transaction engine), so we
|
|
||||||
* reject index builds if there is one in progress. See
|
|
||||||
* GraphDbAccessor::BuildIndex.
|
|
||||||
*/
|
*/
|
||||||
std::atomic<bool> index_build_in_progress_{false};
|
ConcurrentSet<tx::transaction_id_t> index_build_tx_in_progress_;
|
||||||
|
|
||||||
durability::WriteAheadLog wal_;
|
durability::WriteAheadLog wal_;
|
||||||
|
|
||||||
|
@ -97,19 +97,13 @@ void GraphDbAccessor::BuildIndex(const GraphDbTypes::Label &label,
|
|||||||
const GraphDbTypes::Property &property) {
|
const GraphDbTypes::Property &property) {
|
||||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||||
|
|
||||||
{
|
db_.index_build_tx_in_progress_.access().insert(transaction_->id_);
|
||||||
// switch the build_in_progress to true
|
|
||||||
bool expected = false;
|
|
||||||
if (!db_.index_build_in_progress_.compare_exchange_strong(expected, true))
|
|
||||||
throw IndexBuildInProgressException();
|
|
||||||
}
|
|
||||||
|
|
||||||
// on function exit switch the build_in_progress to false
|
// on function exit switch the build_in_progress to false
|
||||||
utils::OnScopeExit on_exit([this] {
|
utils::OnScopeExit on_exit([this] {
|
||||||
bool expected = true;
|
auto removed =
|
||||||
[[gnu::unused]] bool success =
|
db_.index_build_tx_in_progress_.access().remove(transaction_->id_);
|
||||||
db_.index_build_in_progress_.compare_exchange_strong(expected, false);
|
DCHECK(removed) << "Index creation transaction should be inside set";
|
||||||
DCHECK(success) << "BuildIndexInProgress flag was not set during build";
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const LabelPropertyIndex::Key key(label, property);
|
const LabelPropertyIndex::Key key(label, property);
|
||||||
@ -125,9 +119,15 @@ void GraphDbAccessor::BuildIndex(const GraphDbTypes::Label &label,
|
|||||||
// happend before, or a bit later than CreateIndex to end.
|
// happend before, or a bit later than CreateIndex to end.
|
||||||
{
|
{
|
||||||
auto wait_transactions = db_.tx_engine_.ActiveTransactions();
|
auto wait_transactions = db_.tx_engine_.ActiveTransactions();
|
||||||
|
auto active_index_creation_transactions =
|
||||||
|
db_.index_build_tx_in_progress_.access();
|
||||||
for (auto id : wait_transactions) {
|
for (auto id : wait_transactions) {
|
||||||
if (id == transaction_->id_) continue;
|
if (active_index_creation_transactions.contains(id)) continue;
|
||||||
while (db_.tx_engine_.IsActive(id)) {
|
while (db_.tx_engine_.IsActive(id)) {
|
||||||
|
// Active index creation set could only now start containing that id,
|
||||||
|
// since that thread could have not written to the set set and to avoid
|
||||||
|
// dead-lock we need to make sure we keep track of that
|
||||||
|
if (active_index_creation_transactions.contains(id)) continue;
|
||||||
// TODO reconsider this constant, currently rule-of-thumb chosen
|
// TODO reconsider this constant, currently rule-of-thumb chosen
|
||||||
std::this_thread::sleep_for(std::chrono::microseconds(100));
|
std::this_thread::sleep_for(std::chrono::microseconds(100));
|
||||||
}
|
}
|
||||||
@ -142,8 +142,9 @@ void GraphDbAccessor::BuildIndex(const GraphDbTypes::Label &label,
|
|||||||
vertex.current_);
|
vertex.current_);
|
||||||
}
|
}
|
||||||
// Commit transaction as we finished applying method on newest visible
|
// Commit transaction as we finished applying method on newest visible
|
||||||
// records. Write that transaction's ID to the WAL as the index has been built
|
// records. Write that transaction's ID to the WAL as the index has been
|
||||||
// at this point even if this DBA's transaction aborts for some reason.
|
// built at this point even if this DBA's transaction aborts for some
|
||||||
|
// reason.
|
||||||
auto wal_build_index_tx_id = dba.transaction_id();
|
auto wal_build_index_tx_id = dba.transaction_id();
|
||||||
dba.Commit();
|
dba.Commit();
|
||||||
db_.wal_.BuildIndex(wal_build_index_tx_id, LabelName(label),
|
db_.wal_.BuildIndex(wal_build_index_tx_id, LabelName(label),
|
||||||
|
@ -23,13 +23,6 @@ class IndexExistsException : public utils::BasicException {
|
|||||||
using utils::BasicException::BasicException;
|
using utils::BasicException::BasicException;
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Thrown when attempting to build indexes concurrently */
|
|
||||||
class IndexBuildInProgressException : public utils::BasicException {
|
|
||||||
public:
|
|
||||||
IndexBuildInProgressException()
|
|
||||||
: utils::BasicException("Concurrent index build on the same database") {}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An accessor for the database object: exposes functions for operating on the
|
* An accessor for the database object: exposes functions for operating on the
|
||||||
* database. All the functions in this class should be self-sufficient: for
|
* database. All the functions in this class should be self-sufficient: for
|
||||||
|
@ -2399,12 +2399,6 @@ class CreateIndexCursor : public Cursor {
|
|||||||
db_.BuildIndex(self_.label(), self_.property());
|
db_.BuildIndex(self_.label(), self_.property());
|
||||||
} catch (const IndexExistsException &) {
|
} catch (const IndexExistsException &) {
|
||||||
// Ignore creating an existing index.
|
// Ignore creating an existing index.
|
||||||
} catch (const IndexBuildInProgressException &) {
|
|
||||||
// Report to the end user.
|
|
||||||
did_create_ = false;
|
|
||||||
throw QueryRuntimeException(
|
|
||||||
"Index building already in progress on this database. Memgraph "
|
|
||||||
"does not support concurrent index building.");
|
|
||||||
}
|
}
|
||||||
ctx.is_index_created_ = did_create_ = true;
|
ctx.is_index_created_ = did_create_ = true;
|
||||||
return true;
|
return true;
|
||||||
|
@ -132,41 +132,22 @@ TEST_F(GraphDbAccessorIndex, LabelPropertyIndexCount) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
TEST(GraphDbAccessorIndexApi, LabelPropertyBuildIndexConcurrent) {
|
TEST(GraphDbAccessorIndexApi, LabelPropertyBuildIndexConcurrent) {
|
||||||
|
const int ITER_COUNT = 10;
|
||||||
|
for (int iter = 0; iter < ITER_COUNT; ++iter) {
|
||||||
GraphDb db;
|
GraphDb db;
|
||||||
|
const int THREAD_COUNT = 10;
|
||||||
|
std::vector<std::thread> threads;
|
||||||
|
for (int index = 0; index < THREAD_COUNT; ++index) {
|
||||||
|
threads.emplace_back([&db, index]() {
|
||||||
GraphDbAccessor dba(db);
|
GraphDbAccessor dba(db);
|
||||||
|
|
||||||
// We need to build indices in other threads.
|
|
||||||
auto build_index_async = [&db](int &success, int index) {
|
|
||||||
std::thread([&db, &success, index]() {
|
|
||||||
GraphDbAccessor dba(db);
|
|
||||||
try {
|
|
||||||
dba.BuildIndex(dba.Label("l" + std::to_string(index)),
|
dba.BuildIndex(dba.Label("l" + std::to_string(index)),
|
||||||
dba.Property("p" + std::to_string(index)));
|
dba.Property("p" + std::to_string(index)));
|
||||||
success = 1;
|
|
||||||
} catch (IndexBuildInProgressException &) {
|
});
|
||||||
dba.Abort();
|
}
|
||||||
success = 0;
|
// All threads should end and there shouldn't be any deadlock
|
||||||
|
for (auto &thread : threads) thread.join();
|
||||||
}
|
}
|
||||||
})
|
|
||||||
.detach();
|
|
||||||
};
|
|
||||||
|
|
||||||
int build_1_success = -1;
|
|
||||||
build_index_async(build_1_success, 1);
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
||||||
|
|
||||||
// First index build should now be inside the BuildIndex function waiting for
|
|
||||||
// dba to commit. A second built attempt should fail.
|
|
||||||
int build_2_success = -1;
|
|
||||||
build_index_async(build_2_success, 2);
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(30));
|
|
||||||
EXPECT_EQ(build_1_success, -1);
|
|
||||||
EXPECT_EQ(build_2_success, 0);
|
|
||||||
|
|
||||||
// End dba and expect that first build index finished successfully.
|
|
||||||
dba.Commit();
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(30));
|
|
||||||
EXPECT_EQ(build_1_success, 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#define EXPECT_WITH_MARGIN(x, center) \
|
#define EXPECT_WITH_MARGIN(x, center) \
|
||||||
|
Loading…
Reference in New Issue
Block a user