Labels implementation.

Summary: Labels implementation

Reviewers: buda, teon.banek

Reviewed By: buda, teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D201
This commit is contained in:
Dominik Gleich 2017-04-03 11:26:32 +02:00
parent 3ad55dd352
commit 863f55dc2f
10 changed files with 290 additions and 57 deletions

View File

@ -13,6 +13,12 @@
#include "data_structures/concurrent/skiplist_gc.hpp"
/**
* computes the height for the new node from the interval [1...H]
* with p(k) = (1/2)^k for all k from the interval
*/
static thread_local FastBinomial<> rnd;
/** @brief Concurrent lock-based skiplist with fine grained locking
*
* From Wikipedia:
@ -97,12 +103,6 @@
template <class T, size_t H = 32, class lock_t = SpinLock>
class SkipList : private Lockable<lock_t> {
public:
/**
* computes the height for the new node from the interval [1...H]
* with p(k) = (1/2)^k for all k from the interval
*/
static thread_local FastBinomial<H> rnd;
/** @brief Wrapper class for flags used in the implementation
*
* MARKED flag is used to logically delete a node.
@ -909,7 +909,7 @@ class SkipList : private Lockable<lock_t> {
// // will be at least 2 height and
// will
// // be added in front.
// height = rnd();
// height = rnd(H);
// // if (height == 1) height = 2;
// } else {
@ -928,7 +928,7 @@ class SkipList : private Lockable<lock_t> {
}
}
} else {
height = rnd();
height = rnd(H);
// Optimization which doesn't add any extra locking.
if (height == 1)
height = 2; // Same key list will be skipped more often.
@ -969,7 +969,7 @@ class SkipList : private Lockable<lock_t> {
return {Iterator{succs[level]}, false};
}
auto height = rnd();
auto height = rnd(H);
guard_t guards[H];
// try to acquire the locks for predecessors up to the height of
@ -1004,7 +1004,7 @@ class SkipList : private Lockable<lock_t> {
return {Iterator{succs[level]}, false};
}
auto height = rnd();
auto height = rnd(H);
guard_t guards[H];
// try to acquire the locks for predecessors up to the height of
@ -1040,7 +1040,7 @@ class SkipList : private Lockable<lock_t> {
return {Iterator{succs[level]}, false};
}
auto height = rnd();
auto height = rnd(H);
guard_t guards[H];
// try to acquire the locks for predecessors up to the height of
@ -1136,6 +1136,3 @@ class SkipList : private Lockable<lock_t> {
Node *header;
SkiplistGC<Node> gc;
};
template <class T, size_t H, class lock_t>
thread_local FastBinomial<H> SkipList<T, H, lock_t>::rnd;

View File

@ -5,6 +5,7 @@
#include "data_structures/concurrent/concurrent_set.hpp"
#include "data_structures/concurrent/skiplist.hpp"
#include "database/graph_db_datatypes.hpp"
#include "database/indexes/labels_index.hpp"
#include "mvcc/version_list.hpp"
#include "storage/edge.hpp"
#include "storage/garbage_collector.hpp"
@ -64,4 +65,7 @@ class GraphDb {
ConcurrentSet<std::string> labels_;
ConcurrentSet<std::string> edge_types_;
ConcurrentSet<std::string> properties_;
// indexes
LabelsIndex<Vertex> labels_index_;
};

View File

@ -30,7 +30,7 @@ class GraphDbAccessor {
*
* @param db The database
*/
GraphDbAccessor(GraphDb& db);
GraphDbAccessor(GraphDb &db);
~GraphDbAccessor();
// the GraphDbAccessor can NOT be copied nor moved because
@ -45,7 +45,7 @@ class GraphDbAccessor {
/**
* Returns the name of the database of this accessor.
*/
const std::string& name() const;
const std::string &name() const;
/**
* Creates a new Vertex and returns an accessor to it.
@ -62,7 +62,7 @@ class GraphDbAccessor {
* @param vertex_accessor Accessor to vertex.
* @return If or not the vertex was deleted.
*/
bool remove_vertex(VertexAccessor& vertex_accessor);
bool remove_vertex(VertexAccessor &vertex_accessor);
/**
* Removes the vertex of the given accessor along with all it's outgoing
@ -70,7 +70,7 @@ class GraphDbAccessor {
*
* @param vertex_accessor Accessor to a vertex.
*/
void detach_remove_vertex(VertexAccessor& vertex_accessor);
void detach_remove_vertex(VertexAccessor &vertex_accessor);
/**
* Returns iterable over accessors to all the vertices in the graph
@ -96,7 +96,7 @@ class GraphDbAccessor {
* @param type Edge type.
* @return An accessor to the edge.
*/
EdgeAccessor insert_edge(VertexAccessor& from, VertexAccessor& to,
EdgeAccessor insert_edge(VertexAccessor &from, VertexAccessor &to,
GraphDbTypes::EdgeType type);
/**
@ -104,7 +104,7 @@ class GraphDbAccessor {
*
* @param edge_accessor The accessor to an edge.
*/
void remove_edge(EdgeAccessor& edge_accessor);
void remove_edge(EdgeAccessor &edge_accessor);
/**
* Returns iterable over accessors to all the edges in the graph
@ -122,11 +122,42 @@ class GraphDbAccessor {
std::move(filtered));
}
/**
* Insert this record into corresponding label index.
* @param label - label index into which to insert record
* @param record - record which to insert
*/
template <typename TRecord>
void update_index(const GraphDbTypes::Label &label, const TRecord &record) {
db_.labels_index_.Add(label, record.vlist_);
}
/**
* Return VertexAccessors which contain the current label for the current
* transaction visibilty.
* @param label - label for which to return VertexAccessors
* @return iterable collection
*/
auto vertices_by_label(const GraphDbTypes::Label &label) {
return iter::imap(
[this](auto vlist) { return VertexAccessor(*vlist, *this); },
db_.labels_index_.Acquire(label, *transaction_));
}
/**
* Return approximate number of vertices under indexes with the given label.
* Note that this is always an over-estimate and never an under-estimate.
* @param label - label to check for
* @return number of vertices with the given label
*/
size_t vertices_by_label_count(const GraphDbTypes::Label &label) {
return db_.labels_index_.Count(label);
}
/**
* Obtains the Label for the label's name.
* @return See above.
*/
GraphDbTypes::Label label(const std::string& label_name);
GraphDbTypes::Label label(const std::string &label_name);
/**
* Obtains the label name (a string) for the given label.
@ -134,13 +165,13 @@ class GraphDbAccessor {
* @param label a Label.
* @return See above.
*/
std::string& label_name(const GraphDbTypes::Label label) const;
std::string &label_name(const GraphDbTypes::Label label) const;
/**
* Obtains the EdgeType for it's name.
* @return See above.
*/
GraphDbTypes::EdgeType edge_type(const std::string& edge_type_name);
GraphDbTypes::EdgeType edge_type(const std::string &edge_type_name);
/**
* Obtains the edge type name (a string) for the given edge type.
@ -148,13 +179,13 @@ class GraphDbAccessor {
* @param edge_type an EdgeType.
* @return See above.
*/
std::string& edge_type_name(const GraphDbTypes::EdgeType edge_type) const;
std::string &edge_type_name(const GraphDbTypes::EdgeType edge_type) const;
/**
* Obtains the Property for it's name.
* @return See above.
*/
GraphDbTypes::Property property(const std::string& property_name);
GraphDbTypes::Property property(const std::string &property_name);
/**
* Obtains the property name (a string) for the given property.
@ -162,7 +193,7 @@ class GraphDbAccessor {
* @param property a Property.
* @return See above.
*/
std::string& property_name(const GraphDbTypes::Property property) const;
std::string &property_name(const GraphDbTypes::Property property) const;
/**
* Advances transaction's command id by 1.
@ -184,7 +215,7 @@ class GraphDbAccessor {
* @args accessor whose record to initialize.
*/
template <typename TRecord>
void init_record(RecordAccessor<TRecord>& accessor) {
void init_record(RecordAccessor<TRecord> &accessor) {
accessor.record_ = accessor.vlist_->find(*transaction_);
}
@ -193,16 +224,16 @@ class GraphDbAccessor {
* @args accessor whose record to update if possible.
*/
template <typename TRecord>
void update(RecordAccessor<TRecord>& accessor) {
void update(RecordAccessor<TRecord> &accessor) {
if (!accessor.record_->is_visible_write(*transaction_))
accessor.record_ = accessor.vlist_->update(*transaction_);
}
private:
GraphDb& db_;
GraphDb &db_;
/** The current transaction */
tx::Transaction* const transaction_;
tx::Transaction *const transaction_;
bool commited_{false};
bool aborted_{false};

View File

@ -0,0 +1,82 @@
#pragma once
#include "cppitertools/filter.hpp"
#include "cppitertools/imap.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_datatypes.hpp"
#include "mvcc/version_list.hpp"
#include "transactions/transaction.hpp"
/**
* @brief Implements index update and acquire.
* @Tparam T - underlying type in version list.
*/
template <typename T>
class LabelsIndex {
public:
/**
* @brief - Add vlist, if new, to label specific storage.
* @param label - label index to update.
* @param vlist - pointer to vlist entry to add.
*/
void Add(const GraphDbTypes::Label &label, mvcc::VersionList<T> *vlist) {
GetLabel(label)->access().insert(vlist);
}
/**
* @brief - Acquire all the inserted vlists in label specific storage which
* still have that label visible in this transaction.
* @param label - label to query.
* @param t - current transaction, which determines visibility.
* @return iterable collection of vlists records<T> with the requested label.
*/
auto Acquire(const GraphDbTypes::Label &label, const tx::Transaction &t) {
auto label_index = GetLabel(label);
return iter::filter(
[this, &label, &t](auto vlist) {
auto vlist_head = vlist->find(t);
if (vlist_head == nullptr) return false;
auto labels = vlist_head->labels_;
// We have to check for existance of label because the transaction
// might not see the label, or the label was deleted and not yet
// removed from the index.
return std::find(labels.begin(), labels.end(), label) != labels.end();
},
label_index->access());
}
/**
* @brief - Return number of items in skiplist associated with the given
* label. This number could be imprecise because of the underlying skiplist
* storage. Use this as a hint, and not as a rule.
* @param label - label to query for.
* @return number of items
*/
auto Count(const GraphDbTypes::Label &label) {
return GetLabel(label)->access().size();
}
private:
/**
* @brief - Get storage for this label. Creates new
* storage if this label is not yet indexed.
* @param label - Label for which to access storage.
* @return pointer to skiplist of version list records<T>.
*/
auto GetLabel(const GraphDbTypes::Label &label) {
auto access = index_.access();
auto iter = access.find(label);
if (iter == access.end()) {
auto skiplist = new SkipList<mvcc::VersionList<T> *>;
auto ret = access.insert(label, skiplist);
// In case some other insert managed to create new skiplist we shouldn't
// leak memory and should delete this one accordingly.
if (ret.second == false) delete skiplist;
return ret.first->second;
}
return iter->second;
}
ConcurrentMap<GraphDbTypes::Label, SkipList<mvcc::VersionList<T> *> *> index_;
};

View File

@ -5,6 +5,7 @@
#include <vector>
#include "database/graph_db.hpp"
#include "database/graph_db_datatypes.hpp"
#include "query/backend/cpp/typed_value.hpp"
#include "query/frontend/ast/ast_visitor.hpp"
#include "utils/visitor/visitable.hpp"

View File

@ -36,8 +36,8 @@ class RecordAccessor {
* @param vlist MVCC record that this accessor wraps.
* @param db_accessor The DB accessor that "owns" this record accessor.
*/
RecordAccessor(mvcc::VersionList<TRecord>& vlist,
GraphDbAccessor& db_accessor);
RecordAccessor(mvcc::VersionList<TRecord> &vlist,
GraphDbAccessor &db_accessor);
/**
* @param vlist MVCC record that this accessor wraps.
@ -48,8 +48,8 @@ class RecordAccessor {
* accept an already found record.
* @param db_accessor The DB accessor that "owns" this record accessor.
*/
RecordAccessor(mvcc::VersionList<TRecord>& vlist, TRecord& record,
GraphDbAccessor& db_accessor);
RecordAccessor(mvcc::VersionList<TRecord> &vlist, TRecord &record,
GraphDbAccessor &db_accessor);
// this class is default copyable, movable and assignable
RecordAccessor(const RecordAccessor &other) = default;
@ -62,7 +62,7 @@ class RecordAccessor {
* @param key
* @return
*/
const PropertyValue& PropsAt(GraphDbTypes::Property key) const;
const PropertyValue &PropsAt(GraphDbTypes::Property key) const;
/**
* Sets a value on the record for the given property.
@ -93,10 +93,10 @@ class RecordAccessor {
* Returns the properties of this record.
* @return
*/
const PropertyValueStore<GraphDbTypes::Property>& Properties() const;
const PropertyValueStore<GraphDbTypes::Property> &Properties() const;
void PropertiesAccept(std::function<void(const GraphDbTypes::Property key,
const PropertyValue& prop)>
const PropertyValue &prop)>
handler,
std::function<void()> finish = {}) const;
@ -104,21 +104,21 @@ class RecordAccessor {
* This should be used with care as it's comparing vlist_ pointer records and
* not actual values inside RecordAccessors.
*/
friend bool operator<(const RecordAccessor& a, const RecordAccessor& b) {
friend bool operator<(const RecordAccessor &a, const RecordAccessor &b) {
debug_assert(a.db_accessor_ == b.db_accessor_,
"Not in the same transaction."); // assume the same
// db_accessor / transaction
return a.vlist_ < b.vlist_;
}
friend bool operator==(const RecordAccessor& a, const RecordAccessor& b) {
friend bool operator==(const RecordAccessor &a, const RecordAccessor &b) {
debug_assert(a.db_accessor_ == b.db_accessor_,
"Not in the same transaction."); // assume the same
// db_accessor / transaction
return a.vlist_ == b.vlist_;
}
friend bool operator!=(const RecordAccessor& a, const RecordAccessor& b) {
friend bool operator!=(const RecordAccessor &a, const RecordAccessor &b) {
debug_assert(a.db_accessor_ == b.db_accessor_,
"Not in the same transaction."); // assume the same
// db_accessor / transaction
@ -130,7 +130,7 @@ class RecordAccessor {
*
* @return See above.
*/
GraphDbAccessor& db_accessor() const;
GraphDbAccessor &db_accessor() const;
/**
* Returns a temporary ID of the record stored in this accessor.
@ -179,18 +179,16 @@ class RecordAccessor {
*
* @return See above.
*/
TRecord& update();
TRecord &update();
/**
* Returns a version of the record that is only for viewing.
*
* @return See above.
*/
const TRecord& view() const;
const TRecord &view() const;
private:
// The database accessor for which this record accessor is created
// Provides means of getting to the transaction and database functions.
// Immutable, set in the constructor and never changed.
@ -211,5 +209,5 @@ class RecordAccessor {
* Stored as a pointer due to it's mutability (the update() function changes
* it).
*/
TRecord* record_;
TRecord *record_;
};

View File

@ -1,5 +1,6 @@
#include <algorithm>
#include "database/graph_db_accessor.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/util.hpp"
#include "storage/vertex_accessor.hpp"
@ -15,6 +16,7 @@ bool VertexAccessor::add_label(GraphDbTypes::Label label) {
// not a duplicate label, add it
update().labels_.emplace_back(label);
this->db_accessor().update_index(label, *this);
return true;
}

View File

@ -3,7 +3,7 @@
#include "utils/likely.hpp"
#include "utils/random/xorshift128plus.hpp"
template <size_t N, class R = Xorshift128plus>
template <class R = Xorshift128plus>
class FastBinomial {
// fast binomial draws coin tosses from a single generated random number
// let's draw a random 4 bit number and count trailing ones
@ -26,17 +26,18 @@ class FastBinomial {
// ------------------
// 16 1111 -> 5 =====
static constexpr uint64_t mask = (1ULL << N) - 1;
public:
unsigned operator()() {
/**
* Return random number X between 1 and tparam N with probability 2^-X.
*/
unsigned operator()(const int n) {
while (true) {
// couting trailing ones is equal to counting trailing zeros
// since the probability for both is 1/2 and we're going to
// count zeros because they are easier to work with
// generate a random number
auto x = random() & mask;
auto x = random() & mask(n);
// if we have all zeros, then we have an invalid case and we
// need to generate again, we have this every (1/2)^N times
@ -51,5 +52,6 @@ class FastBinomial {
}
private:
uint64_t mask(const int n) { return (1ULL << n) - 1; }
R random;
};

View File

@ -13,7 +13,7 @@
#include "utils/random/fast_binomial.hpp"
static constexpr unsigned B = 24;
static thread_local FastBinomial<B> rnd;
static thread_local FastBinomial<> rnd;
static constexpr unsigned M = 4;
static constexpr size_t N = 1ULL << 34;
@ -22,7 +22,8 @@ static constexpr size_t per_thread_iters = N / M;
std::array<std::atomic<uint64_t>, B> buckets;
void generate() {
for (size_t i = 0; i < per_thread_iters; ++i) buckets[rnd() - 1].fetch_add(1);
for (size_t i = 0; i < per_thread_iters; ++i)
buckets[rnd(B) - 1].fetch_add(1);
}
int main(void) {
@ -33,15 +34,15 @@ int main(void) {
std::array<std::thread, M> threads;
for (auto& bucket : buckets) bucket.store(0);
for (auto &bucket : buckets) bucket.store(0);
for (auto& t : threads) t = std::thread([]() { generate(); });
for (auto &t : threads) t = std::thread([]() { generate(); });
for (auto& t : threads) t.join();
for (auto &t : threads) t.join();
auto max = std::accumulate(
buckets.begin(), buckets.end(), (uint64_t)0,
[](auto& acc, auto& x) { return std::max(acc, x.load()); });
[](auto &acc, auto &x) { return std::max(acc, x.load()); });
std::cout << std::fixed;

115
tests/unit/labels_index.cpp Normal file
View File

@ -0,0 +1,115 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "data_structures/ptr_int.hpp"
#include "database/graph_db_accessor.hpp"
#include "dbms/dbms.hpp"
using testing::UnorderedElementsAreArray;
// Test counter of indexed vertices with the given label.
TEST(LabelsIndex, Count) {
Dbms dbms;
auto accessor = dbms.active();
const int ITERS = 50;
size_t cnt = 0;
for (int i = 0; i < ITERS; ++i) {
auto vertex = accessor->insert_vertex();
if (rand() & 1) {
vertex.add_label(accessor->label("test"));
++cnt;
} else {
vertex.add_label(accessor->label("test2"));
}
// Greater or equal since we said that we always estimate at least the
// real number.
EXPECT_GE(accessor->vertices_by_label_count(accessor->label("test")), cnt);
}
}
// Transaction hasn't ended and so the vertex is not visible.
TEST(LabelsIndex, AddGetZeroLabels) {
Dbms dbms;
auto accessor = dbms.active();
auto vertex = accessor->insert_vertex();
vertex.add_label(accessor->label("test"));
accessor->commit();
auto collection = accessor->vertices_by_label(accessor->label("test"));
std::vector<VertexAccessor> collection_vector;
EXPECT_EQ(collection_vector.size(), (size_t)0);
}
// Test label index by adding and removing one vertex, and removing label from
// another, while the third one with an irrelevant label exists.
TEST(LabelsIndex, AddGetRemoveLabel) {
Dbms dbms;
{
auto accessor = dbms.active();
auto vertex1 = accessor->insert_vertex();
vertex1.add_label(accessor->label("test"));
auto vertex2 = accessor->insert_vertex();
vertex2.add_label(accessor->label("test2"));
auto vertex3 = accessor->insert_vertex();
vertex3.add_label(accessor->label("test"));
accessor->commit();
} // Finish transaction.
{
auto accessor = dbms.active();
auto filtered = accessor->vertices_by_label(accessor->label("test"));
std::vector<VertexAccessor> collection(filtered.begin(), filtered.end());
auto vertices = accessor->vertices();
std::vector<VertexAccessor> expected_collection;
for (auto vertex : vertices) {
if (vertex.has_label(accessor->label("test"))) {
expected_collection.push_back(vertex);
} else {
EXPECT_TRUE(vertex.has_label(accessor->label("test2")));
}
}
EXPECT_EQ(expected_collection.size(), collection.size());
EXPECT_TRUE(collection[0].has_label(accessor->label("test")));
EXPECT_TRUE(collection[1].has_label(accessor->label("test")));
EXPECT_FALSE(collection[0].has_label(accessor->label("test2")));
EXPECT_FALSE(collection[1].has_label(accessor->label("test2")));
accessor->remove_vertex(collection[0]); // Remove from database and test if
// index won't return it.
// Remove label from the vertex and add new label.
collection[1].remove_label(accessor->label("test"));
collection[1].add_label(accessor->label("test2"));
accessor->commit();
}
{
auto accessor = dbms.active();
auto filtered = accessor->vertices_by_label(accessor->label("test"));
std::vector<VertexAccessor> collection(filtered.begin(), filtered.end());
auto vertices = accessor->vertices();
std::vector<VertexAccessor> expected_collection;
for (auto vertex : vertices) {
if (vertex.has_label(accessor->label("test"))) {
expected_collection.push_back(vertex);
} else {
EXPECT_TRUE(vertex.has_label(accessor->label("test2")));
}
}
// It should be empty since everything with an old label is either deleted
// or doesn't have that label anymore.
EXPECT_EQ(expected_collection.size(), 0);
EXPECT_EQ(collection.size(), 0);
}
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}