Drop index
Reviewers: teon.banek, mferencevic, vkasljevic Reviewed By: teon.banek, mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1659
This commit is contained in:
parent
1f07879489
commit
f6f58d843b
@ -178,6 +178,10 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
|
||||
dba->Commit();
|
||||
}
|
||||
|
||||
void GraphDbAccessor::DeleteIndex(storage::Label, storage::Property) {
|
||||
throw utils::NotYetImplemented("Distributed drop index");
|
||||
}
|
||||
|
||||
void GraphDbAccessor::EnableIndex(const LabelPropertyIndex::Key &key) {
|
||||
// Commit transaction as we finished applying method on newest visible
|
||||
// records. Write that transaction's ID to the WAL as the index has been
|
||||
|
@ -39,7 +39,7 @@ class IndexCreationOnWorkerException : public utils::BasicException {
|
||||
|
||||
/// Thrown on concurrent index creation when the transaction engine fails to
|
||||
/// start a new transaction.
|
||||
class IndexCreationException : public utils::BasicException {
|
||||
class IndexTransactionException : public utils::BasicException {
|
||||
using utils::BasicException::BasicException;
|
||||
};
|
||||
|
||||
@ -458,6 +458,12 @@ class GraphDbAccessor {
|
||||
virtual void BuildIndex(storage::Label label, storage::Property property,
|
||||
bool);
|
||||
|
||||
/// Deletes the index responisble for (label, property).
|
||||
/// At the moment this isn't implemented in distributed.
|
||||
///
|
||||
/// @throws NotYetImplemented
|
||||
void DeleteIndex(storage::Label, storage::Property);
|
||||
|
||||
/// Populates index with vertices containing the key
|
||||
void PopulateIndex(const LabelPropertyIndex::Key &key);
|
||||
|
||||
|
@ -135,7 +135,7 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
|
||||
throw;
|
||||
} catch (const tx::TransactionEngineError &e) {
|
||||
db_.storage().label_property_index_.DeleteIndex(key);
|
||||
throw IndexCreationException(e.what());
|
||||
throw IndexTransactionException(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
@ -144,15 +144,9 @@ void GraphDbAccessor::EnableIndex(const LabelPropertyIndex::Key &key) {
|
||||
// records. Write that transaction's ID to the WAL as the index has been
|
||||
// built at this point even if this DBA's transaction aborts for some
|
||||
// reason.
|
||||
auto wal_build_index_tx_id = transaction_id();
|
||||
wal().Emplace(database::StateDelta::BuildIndex(
|
||||
wal_build_index_tx_id, key.label_, LabelName(key.label_), key.property_,
|
||||
transaction_id(), key.label_, LabelName(key.label_), key.property_,
|
||||
PropertyName(key.property_), key.unique_));
|
||||
|
||||
// After these two operations we are certain that everything is contained in
|
||||
// the index under the assumption that the original index creation transaction
|
||||
// contained no vertex/edge insert/update before this method was invoked.
|
||||
db_.storage().label_property_index_.IndexFinishedBuilding(key);
|
||||
}
|
||||
|
||||
void GraphDbAccessor::PopulateIndex(const LabelPropertyIndex::Key &key) {
|
||||
@ -167,6 +161,26 @@ void GraphDbAccessor::PopulateIndex(const LabelPropertyIndex::Key &key) {
|
||||
}
|
||||
}
|
||||
|
||||
void GraphDbAccessor::DeleteIndex(storage::Label label,
|
||||
storage::Property property) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
|
||||
LabelPropertyIndex::Key key(label, property);
|
||||
try {
|
||||
auto dba =
|
||||
db_.AccessBlocking(std::experimental::make_optional(transaction_.id_));
|
||||
|
||||
db_.storage().label_property_index_.DeleteIndex(key);
|
||||
dba->wal().Emplace(database::StateDelta::DropIndex(
|
||||
dba->transaction_id(), key.label_, LabelName(key.label_), key.property_,
|
||||
PropertyName(key.property_)));
|
||||
|
||||
dba->Commit();
|
||||
} catch (const tx::TransactionEngineError &e) {
|
||||
throw IndexTransactionException(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void GraphDbAccessor::UpdateLabelIndices(storage::Label label,
|
||||
const VertexAccessor &vertex_accessor,
|
||||
const Vertex *const vertex) {
|
||||
|
@ -38,7 +38,7 @@ class IndexCreationOnWorkerException : public utils::BasicException {
|
||||
|
||||
/// Thrown on concurrent index creation when the transaction engine fails to
|
||||
/// start a new transaction.
|
||||
class IndexCreationException : public utils::BasicException {
|
||||
class IndexTransactionException : public utils::BasicException {
|
||||
using utils::BasicException::BasicException;
|
||||
};
|
||||
|
||||
@ -437,6 +437,12 @@ class GraphDbAccessor {
|
||||
void BuildIndex(storage::Label label, storage::Property property,
|
||||
bool unique);
|
||||
|
||||
/// Deletes the index responisble for (label, property).
|
||||
///
|
||||
/// @throws IndexTransactionException if it can't obtain a blocking
|
||||
/// transaction.
|
||||
void DeleteIndex(storage::Label label, storage::Property property);
|
||||
|
||||
/// Populates index with vertices containing the key
|
||||
void PopulateIndex(const LabelPropertyIndex::Key &key);
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include "durability/single_node/recovery.hpp"
|
||||
|
||||
#include <experimental/filesystem>
|
||||
#include <experimental/optional>
|
||||
#include <limits>
|
||||
#include <unordered_map>
|
||||
|
||||
@ -144,8 +145,9 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
|
||||
RETURN_IF_NOT(it != index_value.end());
|
||||
auto unique = *it++;
|
||||
RETURN_IF_NOT(label.IsString() && property.IsString() && unique.IsBool());
|
||||
recovery_data->indexes.emplace_back(IndexRecoveryData{
|
||||
label.ValueString(), property.ValueString(), unique.ValueBool()});
|
||||
recovery_data->indexes.emplace_back(
|
||||
IndexRecoveryData{label.ValueString(), property.ValueString(),
|
||||
/*create = */ true, unique.ValueBool()});
|
||||
}
|
||||
|
||||
auto dba = db->Access();
|
||||
@ -402,8 +404,14 @@ void RecoverWal(const fs::path &durability_dir, database::GraphDb *db,
|
||||
break;
|
||||
case database::StateDelta::Type::BUILD_INDEX:
|
||||
// TODO index building might still be problematic in HA
|
||||
recovery_data->indexes.emplace_back(IndexRecoveryData{
|
||||
delta.label_name, delta.property_name, delta.unique});
|
||||
recovery_data->indexes.emplace_back(
|
||||
IndexRecoveryData{delta.label_name, delta.property_name,
|
||||
/*create = */ true, delta.unique});
|
||||
break;
|
||||
case database::StateDelta::Type::DROP_INDEX:
|
||||
recovery_data->indexes.emplace_back(
|
||||
IndexRecoveryData{delta.label_name, delta.property_name,
|
||||
/*create = */ false});
|
||||
break;
|
||||
default:
|
||||
transactions->Apply(delta);
|
||||
@ -420,16 +428,17 @@ void RecoverWal(const fs::path &durability_dir, database::GraphDb *db,
|
||||
|
||||
void RecoverIndexes(database::GraphDb *db,
|
||||
const std::vector<IndexRecoveryData> &indexes) {
|
||||
auto db_accessor_indices = db->Access();
|
||||
auto dba = db->Access();
|
||||
for (const auto &index : indexes) {
|
||||
const database::LabelPropertyIndex::Key key{
|
||||
db_accessor_indices->Label(index.label),
|
||||
db_accessor_indices->Property(index.property), index.unique};
|
||||
db_accessor_indices->db().storage().label_property_index().CreateIndex(key);
|
||||
db_accessor_indices->PopulateIndex(key);
|
||||
db_accessor_indices->EnableIndex(key);
|
||||
auto label = dba->Label(index.label);
|
||||
auto property = dba->Property(index.property);
|
||||
if (index.create) {
|
||||
dba->BuildIndex(label, property, index.unique);
|
||||
} else {
|
||||
dba->DeleteIndex(label, property);
|
||||
}
|
||||
}
|
||||
db_accessor_indices->Commit();
|
||||
dba->Commit();
|
||||
}
|
||||
|
||||
} // namespace durability
|
||||
|
@ -39,7 +39,8 @@ struct RecoveryInfo {
|
||||
struct IndexRecoveryData {
|
||||
std::string label;
|
||||
std::string property;
|
||||
bool unique;
|
||||
bool create; // distinguish between creating and dropping index
|
||||
bool unique; // used only when creating an index
|
||||
};
|
||||
|
||||
// A data structure for exchanging info between main recovery function and
|
||||
|
@ -114,6 +114,18 @@ StateDelta StateDelta::BuildIndex(tx::TransactionId tx_id, storage::Label label,
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::DropIndex(tx::TransactionId tx_id, storage::Label label,
|
||||
const std::string &label_name,
|
||||
storage::Property property,
|
||||
const std::string &property_name) {
|
||||
StateDelta op(StateDelta::Type::DROP_INDEX, tx_id);
|
||||
op.label = label;
|
||||
op.label_name = label_name;
|
||||
op.property = property;
|
||||
op.property_name = property_name;
|
||||
return op;
|
||||
}
|
||||
|
||||
void StateDelta::Encode(
|
||||
HashedFileWriter &writer,
|
||||
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) const {
|
||||
@ -166,6 +178,12 @@ void StateDelta::Encode(
|
||||
encoder.WriteString(property_name);
|
||||
encoder.WriteBool(unique);
|
||||
break;
|
||||
case Type::DROP_INDEX:
|
||||
encoder.WriteInt(label.Id());
|
||||
encoder.WriteString(label_name);
|
||||
encoder.WriteInt(property.Id());
|
||||
encoder.WriteString(property_name);
|
||||
break;
|
||||
}
|
||||
|
||||
writer.WriteValue(writer.hash());
|
||||
@ -241,6 +259,12 @@ std::experimental::optional<StateDelta> StateDelta::Decode(
|
||||
DECODE_MEMBER(property_name, ValueString)
|
||||
DECODE_MEMBER(unique, ValueBool)
|
||||
break;
|
||||
case Type::DROP_INDEX:
|
||||
DECODE_MEMBER_CAST(label, ValueInt, storage::Label)
|
||||
DECODE_MEMBER(label_name, ValueString)
|
||||
DECODE_MEMBER_CAST(property, ValueInt, storage::Property)
|
||||
DECODE_MEMBER(property_name, ValueString)
|
||||
break;
|
||||
}
|
||||
|
||||
auto decoder_hash = reader.hash();
|
||||
@ -305,7 +329,8 @@ void StateDelta::Apply(GraphDbAccessor &dba) const {
|
||||
dba.RemoveEdge(edge);
|
||||
break;
|
||||
}
|
||||
case Type::BUILD_INDEX: {
|
||||
case Type::BUILD_INDEX:
|
||||
case Type::DROP_INDEX: {
|
||||
LOG(FATAL) << "Index handling not handled in Apply";
|
||||
break;
|
||||
}
|
||||
|
@ -71,6 +71,7 @@ in StateDeltas.")
|
||||
remove-vertex ;; vertex_id, check_empty
|
||||
remove-edge ;; edge_id
|
||||
build-index ;; label, label_name, property, property_name, unique
|
||||
drop-index ;; label, label_name, property, property_name
|
||||
)
|
||||
(:documentation
|
||||
"Defines StateDelta type. For each type the comment indicates which values
|
||||
@ -125,6 +126,10 @@ omitted in the comment."))
|
||||
const std::string &label_name,
|
||||
storage::Property property,
|
||||
const std::string &property_name, bool unique);
|
||||
static StateDelta DropIndex(tx::TransactionId tx_id, storage::Label label,
|
||||
const std::string &label_name,
|
||||
storage::Property property,
|
||||
const std::string &property_name);
|
||||
|
||||
/// Applies CRUD delta to database accessor. Fails on other types of deltas
|
||||
void Apply(GraphDbAccessor &dba) const;
|
||||
|
@ -146,6 +146,7 @@ bool WriteAheadLog::IsStateDeltaTransactionEnd(
|
||||
case database::StateDelta::Type::REMOVE_VERTEX:
|
||||
case database::StateDelta::Type::REMOVE_EDGE:
|
||||
case database::StateDelta::Type::BUILD_INDEX:
|
||||
case database::StateDelta::Type::DROP_INDEX:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -536,14 +536,24 @@ Callback HandleIndexQuery(IndexQuery *index_query,
|
||||
throw QueryRuntimeException(e.what());
|
||||
}
|
||||
// Otherwise ignore creating an existing index.
|
||||
} catch (const database::IndexCreationException &e) {
|
||||
} catch (const database::IndexTransactionException &e) {
|
||||
throw QueryRuntimeException(e.what());
|
||||
}
|
||||
return std::vector<std::vector<TypedValue>>();
|
||||
};
|
||||
return callback;
|
||||
case IndexQuery::Action::DROP:
|
||||
throw utils::NotYetImplemented("DROP INDEX");
|
||||
callback.fn = [label, properties, db_accessor, invalidate_plan_cache] {
|
||||
try {
|
||||
CHECK(properties.size() == 1);
|
||||
db_accessor->DeleteIndex(label, properties[0]);
|
||||
invalidate_plan_cache();
|
||||
} catch (const database::IndexTransactionException &e) {
|
||||
throw QueryRuntimeException(e.what());
|
||||
}
|
||||
return std::vector<std::vector<TypedValue>>();
|
||||
};
|
||||
return callback;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -73,18 +73,10 @@ class LabelPropertyIndex {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns if it succeded in deleting the index and freeing the index memory
|
||||
* Returns if it succeeded in deleting the index and freeing the index memory
|
||||
*/
|
||||
void DeleteIndex(const Key &key) { indices_.access().remove(key); }
|
||||
|
||||
/**
|
||||
* @brief - Notify that the index has been populated with everything it should
|
||||
* be populated with, and can be used from this moment forward without missing
|
||||
* any records.
|
||||
* @param key - index which finished being populated.
|
||||
*/
|
||||
void IndexFinishedBuilding(const Key &key) {
|
||||
ready_for_use_.access().insert(key);
|
||||
void DeleteIndex(const Key &key) {
|
||||
indices_.access().remove(key);
|
||||
}
|
||||
|
||||
/** NOTE: All update methods aren't supporting the case where two threads
|
||||
@ -218,7 +210,7 @@ class LabelPropertyIndex {
|
||||
* key sorted ascendingly by the property value.
|
||||
*/
|
||||
auto GetVlists(const Key &key, const tx::Transaction &t, bool current_state) {
|
||||
DCHECK(ready_for_use_.access().contains(key)) << "Index not yet ready.";
|
||||
DCHECK(IndexExists(key)) << "Index not yet ready.";
|
||||
auto access = GetKeyStorage(key)->access();
|
||||
auto begin = access.begin();
|
||||
return index::GetVlists<typename SkipList<IndexEntry>::Iterator, IndexEntry,
|
||||
@ -246,7 +238,7 @@ class LabelPropertyIndex {
|
||||
*/
|
||||
auto GetVlists(const Key &key, const PropertyValue &value,
|
||||
const tx::Transaction &t, bool current_state) {
|
||||
DCHECK(ready_for_use_.access().contains(key)) << "Index not yet ready.";
|
||||
DCHECK(IndexExists(key)) << "Index not yet ready.";
|
||||
auto access = GetKeyStorage(key)->access();
|
||||
auto min_ptr = std::numeric_limits<std::uintptr_t>::min();
|
||||
auto start_iter = access.find_or_larger(IndexEntry(
|
||||
@ -294,7 +286,7 @@ class LabelPropertyIndex {
|
||||
const std::experimental::optional<utils::Bound<PropertyValue>> lower,
|
||||
const std::experimental::optional<utils::Bound<PropertyValue>> upper,
|
||||
const tx::Transaction &transaction, bool current_state) {
|
||||
DCHECK(ready_for_use_.access().contains(key)) << "Index not yet ready.";
|
||||
DCHECK(IndexExists(key)) << "Index not yet ready.";
|
||||
|
||||
auto type = [](const auto &bound) { return bound.value().value().type(); };
|
||||
CHECK(lower || upper) << "At least one bound must be provided";
|
||||
@ -361,7 +353,8 @@ class LabelPropertyIndex {
|
||||
* @return true if the index with that key exists
|
||||
*/
|
||||
bool IndexExists(const Key &key) {
|
||||
return ready_for_use_.access().contains(key);
|
||||
auto access = indices_.access();
|
||||
return access.find(key) != access.end();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -378,7 +371,6 @@ class LabelPropertyIndex {
|
||||
int64_t Count(const Key &key) {
|
||||
auto index = GetKeyStorage(key);
|
||||
CHECK(index != nullptr) << "Index doesn't exist.";
|
||||
CHECK(ready_for_use_.access().contains(key)) << "Index not yet ready.";
|
||||
return index->access().size();
|
||||
}
|
||||
|
||||
@ -620,6 +612,5 @@ class LabelPropertyIndex {
|
||||
}
|
||||
|
||||
ConcurrentMap<Key, std::unique_ptr<SkipList<IndexEntry>>> indices_;
|
||||
SkipList<Key> ready_for_use_;
|
||||
};
|
||||
} // namespace database
|
||||
|
@ -43,6 +43,8 @@ std::string StateDeltaTypeToString(database::StateDelta::Type type) {
|
||||
return "REMOVE_EDGE";
|
||||
case database::StateDelta::Type::BUILD_INDEX:
|
||||
return "BUILD_INDEX";
|
||||
case database::StateDelta::Type::DROP_INDEX:
|
||||
return "DROP_INDEX";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,6 @@ class LabelPropertyIndexComplexTest : public ::testing::Test {
|
||||
|
||||
key = new LabelPropertyIndex::Key(label, property);
|
||||
EXPECT_EQ(index.CreateIndex(*key), true);
|
||||
index.IndexFinishedBuilding(*key);
|
||||
|
||||
t = engine.Begin();
|
||||
vlist = new mvcc::VersionList<Vertex>(*t, 0);
|
||||
@ -88,8 +87,6 @@ TEST(LabelPropertyIndex, IndexExistance) {
|
||||
LabelPropertyIndex index;
|
||||
EXPECT_EQ(index.CreateIndex(key), true);
|
||||
// Index doesn't exist - and can't be used untill it's been notified as built.
|
||||
EXPECT_EQ(index.IndexExists(key), false);
|
||||
index.IndexFinishedBuilding(key);
|
||||
EXPECT_EQ(index.IndexExists(key), true);
|
||||
}
|
||||
|
||||
@ -100,13 +97,7 @@ TEST(LabelPropertyIndex, Count) {
|
||||
auto property = accessor->Property("property");
|
||||
LabelPropertyIndex::Key key(label, property);
|
||||
LabelPropertyIndex index;
|
||||
::testing::FLAGS_gtest_death_test_style = "threadsafe";
|
||||
|
||||
EXPECT_DEATH(index.Count(key), "Index doesn't exist.");
|
||||
EXPECT_EQ(index.CreateIndex(key), true);
|
||||
EXPECT_DEATH(index.Count(key), "Index not yet ready.");
|
||||
|
||||
index.IndexFinishedBuilding(key);
|
||||
EXPECT_EQ(index.Count(key), 0);
|
||||
}
|
||||
|
||||
|
@ -356,6 +356,7 @@ TEST_F(Durability, WalEncoding) {
|
||||
ASSERT_EQ(e0.gid(), gid0);
|
||||
e0.PropsSet(dba->Property("p0"), std::vector<PropertyValue>{1, 2, 3});
|
||||
dba->BuildIndex(dba->Label("l1"), dba->Property("p1"), false);
|
||||
dba->DeleteIndex(dba->Label("l1"), dba->Property("p1"));
|
||||
dba->Commit();
|
||||
|
||||
db.wal().Flush();
|
||||
@ -386,7 +387,7 @@ TEST_F(Durability, WalEncoding) {
|
||||
}
|
||||
}
|
||||
reader.Close();
|
||||
ASSERT_EQ(deltas.size(), 11);
|
||||
ASSERT_EQ(deltas.size(), 14);
|
||||
|
||||
using Type = enum database::StateDelta::Type;
|
||||
EXPECT_EQ(deltas[0].type, Type::TRANSACTION_BEGIN);
|
||||
@ -408,9 +409,18 @@ TEST_F(Durability, WalEncoding) {
|
||||
EXPECT_EQ(deltas[8].type, Type::BUILD_INDEX);
|
||||
EXPECT_EQ(deltas[8].label_name, "l1");
|
||||
EXPECT_EQ(deltas[8].property_name, "p1");
|
||||
EXPECT_EQ(deltas[8].unique, false);
|
||||
EXPECT_EQ(deltas[9].type, Type::TRANSACTION_COMMIT);
|
||||
EXPECT_EQ(deltas[10].type, Type::TRANSACTION_COMMIT);
|
||||
EXPECT_EQ(deltas[10].transaction_id, 1);
|
||||
|
||||
// The next two deltas are the DeleteIndex internal transactions.
|
||||
EXPECT_EQ(deltas[10].type, Type::TRANSACTION_BEGIN);
|
||||
EXPECT_EQ(deltas[11].type, Type::DROP_INDEX);
|
||||
EXPECT_EQ(deltas[11].label_name, "l1");
|
||||
EXPECT_EQ(deltas[11].property_name, "p1");
|
||||
EXPECT_EQ(deltas[12].type, Type::TRANSACTION_COMMIT);
|
||||
|
||||
EXPECT_EQ(deltas[13].type, Type::TRANSACTION_COMMIT);
|
||||
EXPECT_EQ(deltas[13].transaction_id, 1);
|
||||
}
|
||||
|
||||
TEST_F(Durability, SnapshotEncoding) {
|
||||
|
@ -121,6 +121,17 @@ TEST_F(GraphDbAccessorIndex, LabelPropertyIndexBuild) {
|
||||
EXPECT_EQ(dba->VerticesCount(label, property2), 0);
|
||||
}
|
||||
|
||||
TEST_F(GraphDbAccessorIndex, LabelPropertyIndexDelete) {
|
||||
dba->BuildIndex(label, property, false);
|
||||
Commit();
|
||||
EXPECT_TRUE(dba->LabelPropertyIndexExists(label, property));
|
||||
|
||||
dba->DeleteIndex(label, property);
|
||||
Commit();
|
||||
|
||||
EXPECT_FALSE(dba->LabelPropertyIndexExists(label, property));
|
||||
}
|
||||
|
||||
TEST_F(GraphDbAccessorIndex, LabelPropertyIndexBuildTwice) {
|
||||
dba->BuildIndex(label, property, false);
|
||||
EXPECT_THROW(dba->BuildIndex(label, property, false), utils::BasicException);
|
||||
@ -150,7 +161,7 @@ TEST(GraphDbAccessorIndexApi, LabelPropertyBuildIndexConcurrent) {
|
||||
dba->BuildIndex(dba->Label("l" + std::to_string(index)),
|
||||
dba->Property("p" + std::to_string(index)), false);
|
||||
// If it throws, make sure the exception is right.
|
||||
} catch (const database::IndexCreationException &e) {
|
||||
} catch (const database::IndexTransactionException &e) {
|
||||
// Nothing to see here, move along.
|
||||
} catch (...) {
|
||||
failed.store(true);
|
||||
|
Loading…
Reference in New Issue
Block a user