Add concurrent benchmarks for skiplist.
Summary: Skiplist benchmark and refactor testing infrastructure. Reviewers: mferencevic, buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D103
This commit is contained in:
parent
ffc977dbfc
commit
487f429d5b
@ -0,0 +1,95 @@
|
||||
#pragma once
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "data_structures/concurrent/skiplist.hpp"
|
||||
|
||||
/**
|
||||
* Helper functions for skiplist. This functions are used to insert
|
||||
* concurrently into skiplist.
|
||||
*/
|
||||
class SkipListHelper {
|
||||
public:
|
||||
/**
|
||||
* Inserts into a skiplist concurrently. Tries to synchronize all threads to
|
||||
* start and end in the same time. This function should only be used to
|
||||
* benchmark skiplist in a way that doesn't give thread a chance to consume
|
||||
* more than the (end - start) / num_of_threads elements in the allocated
|
||||
* time, since that would make the measurement inaccurate. Also shuffles the
|
||||
* data to avoid consecutive value inserts.
|
||||
*
|
||||
* @param skiplist - skiplist instance
|
||||
* @param start - value_range start
|
||||
* @param end - value_range end (exclusive)
|
||||
* @param num_of_threads - number of threads to insert with
|
||||
* @param duration - duration of thread time in microseconds
|
||||
* @return number of inserted elements
|
||||
*/
|
||||
static int InsertConcurrentSkiplistTimed(
|
||||
SkipList<int> *skiplist, const int start, const int end,
|
||||
const int num_of_threads, const std::chrono::microseconds &duration) {
|
||||
std::vector<int> V(end - start);
|
||||
for (int i = start; i < end; ++i) V[i] = i;
|
||||
std::random_shuffle(V.begin(), V.end());
|
||||
std::vector<std::thread> threads;
|
||||
|
||||
std::atomic<bool> stopped{1};
|
||||
std::atomic<int> count{0};
|
||||
for (int i = 0; i < num_of_threads; ++i) {
|
||||
const int part = (end - start) / num_of_threads;
|
||||
threads.emplace_back(std::thread(
|
||||
[&V, &stopped, &count](SkipList<int> *skiplist, int start, int end) {
|
||||
while (stopped)
|
||||
;
|
||||
auto accessor = skiplist->access();
|
||||
for (int i = start; i < end && !stopped; ++i) {
|
||||
while (accessor.insert(V[i]).second == false)
|
||||
;
|
||||
++count;
|
||||
}
|
||||
},
|
||||
skiplist, start + i * part, start + (i + 1) * part));
|
||||
}
|
||||
stopped = false;
|
||||
std::this_thread::sleep_for(duration);
|
||||
stopped = true;
|
||||
for (auto &x : threads) x.join();
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert into skiplist concurrently. With the hardware maximal number of
|
||||
* threads an instance will allow.
|
||||
*
|
||||
* @param skiplist - skiplist instance
|
||||
* @param start - starting value to insert
|
||||
* @param end - ending value to insert
|
||||
*/
|
||||
static void InsertConcurrentSkiplist(SkipList<int> *skiplist, int start,
|
||||
int end) {
|
||||
int number_of_threads = std::thread::hardware_concurrency();
|
||||
std::vector<std::thread> threads;
|
||||
|
||||
for (int i = 0; i < number_of_threads; i++) {
|
||||
const int part = (end - start) / number_of_threads;
|
||||
threads.emplace_back(std::thread(
|
||||
[](SkipList<int> *skiplist, int start, int end) {
|
||||
auto accessor = skiplist->access();
|
||||
|
||||
for (; start < end; start++) {
|
||||
while (!accessor.insert(std::move(start)).second)
|
||||
;
|
||||
}
|
||||
},
|
||||
skiplist, start + i * part, start + (i + 1) * part));
|
||||
}
|
||||
|
||||
for (auto &thread : threads) thread.join();
|
||||
}
|
||||
|
||||
private:
|
||||
};
|
@ -0,0 +1,63 @@
|
||||
#ifndef NDEBUG
|
||||
#define NDEBUG
|
||||
#endif
|
||||
|
||||
#include <algorithm>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "benchmark/benchmark.h"
|
||||
#include "benchmark/benchmark_api.h"
|
||||
#include "data_structures/concurrent/skiplist.hpp"
|
||||
#include "logging/default.hpp"
|
||||
#include "logging/streams/stderr.hpp"
|
||||
#include "skiplist_helper.hpp"
|
||||
#include "utils/assert.hpp"
|
||||
|
||||
void Insert(benchmark::State& state) {
|
||||
SkipList<int> skiplist;
|
||||
while (state.KeepRunning()) {
|
||||
const int count = SkipListHelper::InsertConcurrentSkiplistTimed(
|
||||
&skiplist, 0, 10000000, state.range(1),
|
||||
std::chrono::microseconds(state.range(0)));
|
||||
state.SetItemsProcessed(count); // Number of processed items in one
|
||||
// iteration - useful for items/per s.
|
||||
state.SetIterationTime(state.range(0) * 1.0 /
|
||||
1000000); // Time the iteration took - since ideally
|
||||
// all threads should run and stop at the
|
||||
// same time we set the time manually.
|
||||
auto sl_access = skiplist.access();
|
||||
while (sl_access.size()) sl_access.remove(*sl_access.begin());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invokes the test function with two arguments, time and number of threads.
|
||||
* Time is specified in microseconds.
|
||||
*/
|
||||
static void CustomArguments(benchmark::internal::Benchmark* b) {
|
||||
for (int i = (1 << 18); i <= (1 << 20); i *= 2)
|
||||
for (int j = 1; j <= 8; ++j) b->Args({i, j});
|
||||
}
|
||||
|
||||
/**
|
||||
* This benchmark represents a use case of benchmarking one multi-threaded
|
||||
* concurrent structure. This test assumes that all threads will start and end
|
||||
* at the exact same time and will compete with each other.
|
||||
*/
|
||||
BENCHMARK(Insert)
|
||||
->Apply(CustomArguments) // Set custom arguments.
|
||||
->Unit(benchmark::kMicrosecond)
|
||||
->UseManualTime() // Don't calculate real-time but depend on function
|
||||
// providing the execution time.
|
||||
->Repetitions(3)
|
||||
->ReportAggregatesOnly(1);
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
logging::init_async();
|
||||
logging::log->pipe(std::make_unique<Stderr>());
|
||||
|
||||
::benchmark::Initialize(&argc, argv);
|
||||
::benchmark::RunSpecifiedBenchmarks();
|
||||
return 0;
|
||||
}
|
@ -14,41 +14,19 @@
|
||||
#include "data_structures/concurrent/skiplist.hpp"
|
||||
#include "logging/default.hpp"
|
||||
#include "logging/streams/stdout.hpp"
|
||||
#include "skiplist_helper.hpp"
|
||||
#include "utils/random/random_generator.hpp"
|
||||
|
||||
using utils::random::NumberGenerator;
|
||||
using IntegerGenerator = NumberGenerator<std::uniform_int_distribution<int>,
|
||||
std::default_random_engine, int>;
|
||||
|
||||
void InsertSkiplist(SkipList<int> *skiplist, int start, int end) {
|
||||
auto accessor = skiplist->access();
|
||||
|
||||
for (int start = 0; start < end; start++) {
|
||||
accessor.insert(std::move(start));
|
||||
}
|
||||
}
|
||||
|
||||
void InsertConcurrentSkiplist(SkipList<int> *skiplist, int start, int end) {
|
||||
int number_od_threads = std::thread::hardware_concurrency();
|
||||
std::vector<std::thread> threads(number_od_threads);
|
||||
|
||||
for (int i = 0; i < number_od_threads; i++) {
|
||||
int part = (end - start) / number_od_threads;
|
||||
threads[i] = std::thread(InsertSkiplist, skiplist, start + i * part,
|
||||
start + (i + 1) * part);
|
||||
}
|
||||
|
||||
for (int i = 0; i < number_od_threads; i++) {
|
||||
threads[i].join();
|
||||
}
|
||||
}
|
||||
|
||||
static void ReverseFromRBegin(benchmark::State &state) {
|
||||
while (state.KeepRunning()) {
|
||||
state.PauseTiming();
|
||||
|
||||
SkipList<int> skiplist;
|
||||
InsertConcurrentSkiplist(&skiplist, 0, state.range(0));
|
||||
SkipListHelper::InsertConcurrentSkiplist(&skiplist, 0, state.range(0));
|
||||
int counter = 10;
|
||||
|
||||
auto accessor = skiplist.access();
|
||||
@ -70,7 +48,7 @@ static void FindFromRBegin(benchmark::State &state) {
|
||||
state.PauseTiming();
|
||||
|
||||
SkipList<int> skiplist;
|
||||
InsertConcurrentSkiplist(&skiplist, 0, state.range(0));
|
||||
SkipListHelper::InsertConcurrentSkiplist(&skiplist, 0, state.range(0));
|
||||
int counter = 10;
|
||||
|
||||
auto accessor = skiplist.access();
|
||||
|
Loading…
Reference in New Issue
Block a user