SkipList - added position_and_count function

Summary: This is the first implementation that seems to work. I am not happy with it's complexity. Might attempt a simpler implementation, at the cost of some performance.

Reviewers: dgleich, buda

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D502
This commit is contained in:
florijan 2017-06-20 16:01:04 +02:00
parent a726ac0023
commit 19c0dfe084
2 changed files with 208 additions and 1 deletions

View File

@ -527,6 +527,106 @@ class SkipList : private Lockable<lock_t> {
return skiplist->find_or_larger<It, K>(item);
}
/**
* Position and count estimation. Gives estimates
* on the position of the given item in this skiplist, and
* the number of identical items according to 'greater'.
*
* If `item` is not contained in the skiplist,
* then the position where it would be inserted is returned
* as the position estimate, and 0 as count estimate.
*
* Position and count detection works by iterating over the
* list at a certain level. These levels can be tuned as
* a performance vs precision optimization. Lower levels mean
* higher precision, higher levels mean better performance.
* TODO: tune the levels once benchmarks are available.
*
* @param item The item for which the position is estimated.
* @param greater Comparison function. It must be partially
* consistent with natural comparison of Skiplist elements:
* if `greater` indicates that X is greater then
* Y, then natural comparison must indicate the same. The
* reverse does not have to hold.
* @param position_level_reduction - Defines at which level
* item position is estimated. Position level is defined
* as log2(skiplist->size()) - position_level_reduction.
* @param count_max_level - Defines the max level at which
* item count is estimated.
* @tparam TGreater Type of `greater`
* @return A pair of ints where the first element is the estimated
* position of item, and the second is the estimated number
* of items that are the same according to `greater`.
*/
template <typename TItem, typename TGreater = std::greater<T>>
auto position_and_count(const TItem &item, TGreater greater = TGreater{},
int position_level_reduction = 10,
int count_max_level = 3) {
// the level at which position will be sought
int position_level = std::max(
0, static_cast<int>(std::lround(std::log2(skiplist->size()))) -
position_level_reduction);
Node *pred = skiplist->header;
Node *succ = nullptr;
int position = 0;
for (int i = position_level; i >= 0; i--) {
// count how many towers we pass on this level,
// used for calculating item position
int tower_count = 0;
succ = pred->forward(i);
while (succ && greater(item, succ->value())) {
pred = succ;
succ = succ->forward(i);
tower_count++;
}
// in the succs field we'll keep track of successors
// that are equal to item, or nullptr otherwise
succs[i] = (!succ || greater(succ->value(), item)) ? nullptr : succ;
position += (1 << i) * tower_count;
}
// if succ is nullptr, we have the last skiplist element
if (succ == nullptr) {
// pred now contains the first node whose value <= item
// check if we found the item exactly (value == item)
bool found = pred != skiplist->header && !greater(item, pred->value());
return std::make_pair(position, found ? 1 : 0);
}
// now we need to estimate the count of elements equal to item
// we'll do that by looking for the first element that is greater
// then item, and counting how far we have to look
// first find the rightmost (highest) succ that has value == item
int count_level = 0;
for (int i = position_level; i >= 0; i--)
if (succs[i]) {
count_level = i;
break;
}
count_level = std::min(count_level, count_max_level);
succ = succs[count_level];
// now expand to the right as long as element value == item
// at the same time accumulate count
int count = 1 << count_level;
for (; count_level >= 0; count_level--) {
Node *next = succ->forward(count_level);
while (next && !greater(next->value(), item)) {
succ = next;
next = next->forward(count_level);
count += 1 << count_level;
}
}
return std::make_pair(position, count);
}
template <class K>
bool contains(const K &item) const {
return this->find(item) != this->end();
@ -619,6 +719,7 @@ class SkipList : private Lockable<lock_t> {
return std::make_pair(rend(), false);
}
// TODO why are preds created here and not reused from accessor?
Node *preds[H];
find_path(item, preds);
return std::make_pair(ReverseIterator(this, preds[0], preds), true);
@ -673,11 +774,13 @@ class SkipList : private Lockable<lock_t> {
* towers that would link to the new tower. If nullptr, it is
* ignored.
* @param succs - Like preds, for successor nodes.
* @tparam K - type of item that must be comparable to the
* type of item <T> stored in the skiplist.
* @return - The height of the node already present in the
* skiplist, that matches the given item (is equal to it).
* Returns -1 if there is no matching item in the skiplist.
*/
template <class K>
template <typename K>
int find_path(const K &item, Node *preds[] = nullptr,
Node *succs[] = nullptr) const {
int level_found = -1;
@ -716,6 +819,11 @@ class SkipList : private Lockable<lock_t> {
// finds the max level of the skiplist based on the size (simple math).
auto level = static_cast<size_t>(std::round(std::log2(skiplist_size)));
// TODO
// inconsistent design, it seems that Accessor is trying to reuse nodes
// and pass the same ones to SkipList functions, why is this function
// doing it differently?
// also, why is 32 hardcoded?
Node *first_preds[32];
Node *second_preds[32];

View File

@ -0,0 +1,99 @@
#include <ctime>
#include <iostream>
#include <limits>
#include <vector>
#include <fmt/format.h>
#include "data_structures/concurrent/skiplist.hpp"
/** Calculates the mean of a given vector of numbers */
template <typename TNumber>
auto mean(const std::vector<TNumber> &values) {
TNumber r_val = 0;
for (const auto &value : values) r_val += value;
return r_val / values.size();
}
/** Logging helper function */
template <typename... TArgs>
void log(const std::string &format, TArgs &&... args) {
std::cout << fmt::format(format, std::forward<TArgs>(args)...) << std::endl;
}
/** Creates a skiplist containing all ints in range [0, size) */
std::unique_ptr<SkipList<int>> make_sl(int size) {
auto sl = std::make_unique<SkipList<int>>();
auto access = sl->access();
for (int i = 0; i < size; i++) access.insert(i);
return sl;
}
/**
* Performs testing of the position_and_count function
* of a skiplist. Looks for three positions in the skiplist,
* those at 1/4, 1/2 and 3/4 values. Prints out results
* to stdout, does not do any automated checks if the
* results are valid.
*
* @param size - size of the skiplist to test with
* @param iterations - number of iterations of each test.
* @param granulation - How many sequential ints should be
* considered equal in testing by the custom `greater`
* function.
*/
void test(int size, int iterations = 20, int granulation = 1) {
auto greater = [granulation](const int &a, const int &b) {
return a / granulation > b / granulation;
};
log("\nTesting skiplist size {} with granulation {}", size, granulation);
// test at 1/4, 1/2 and 3/4 points
std::vector<int> positions({size / 4, size / 2, size * 3 / 4});
std::vector<std::vector<int>> less(3);
std::vector<std::vector<int>> equal(3);
std::vector<std::vector<double>> time(3);
for (int iteration = 0; iteration < iterations; iteration++) {
auto sl = make_sl(size);
for (auto pos : {0, 1, 2}) {
clock_t start_time = clock();
auto pos_and_count =
sl->access().position_and_count(positions[pos], greater);
auto t = double(clock() - start_time) / CLOCKS_PER_SEC;
less[pos].push_back(pos_and_count.first);
equal[pos].push_back(pos_and_count.second);
time[pos].push_back(t);
}
}
// convert values to errors
for (auto pos : {0, 1, 2}) {
auto position = positions[pos];
log("\tPosition {}", position);
for (auto &less_elem : less[pos])
less_elem = std::abs(less_elem - position);
log("\t\tMean position error: {}", mean(less[pos]));
for (auto &equal_elem : equal[pos])
equal_elem = std::abs(equal_elem - granulation);
log("\t\tMean count error: {}", mean(equal[pos]));
log("\t\tMean time (ms): {}", mean(time[pos]) * 1000);
}
}
int main(int argc, char *argv[]) {
log("Skiplist position and count testing");
int size = 1000;
int iterations = 10;
if (argc > 1) size = (int)std::stoi(argv[1]);
if (argc > 2) iterations = (int)std::stoi(argv[2]);
std::vector<int> granulations;
for (int i = 1 ; i < size ; i *= 100) granulations.push_back(i);
for (auto granulation : granulations) test(size, iterations, granulation);
return 0;
}