Available memory logging, utils/sysinfo/memory cleanup, tests/concurrent cleanup.

Summary:
Added a warning to the log. Every 3 seconds (let's not make that configurable). Can be turned off.

I still don't like this. We are raising another thread and reading a file to do monitoring. We're developing a DB, not a sys monitor. Serious admins do that themselves. But, here it is.

UPDATE:
Cleaned up `utils/sysinfo/memory`. Removed all unused functions. Removed the faulty memory check in `tests/concurrent`.

Reviewers: buda, mferencevic, mislav.bradac

Reviewed By: mislav.bradac

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D819
This commit is contained in:
florijan 2017-09-22 14:24:53 +02:00
parent 4e4fdd8029
commit b2f3dc1916
16 changed files with 436 additions and 512 deletions

View File

@ -19,6 +19,7 @@
* Use \u to specify 4 digit codepoint and \U for 8 digit * Use \u to specify 4 digit codepoint and \U for 8 digit
* Keywords appearing in header (named expressions) keep original case. * Keywords appearing in header (named expressions) keep original case.
* Our Bolt protocol implementation is now completely compatible with the protocol version 1 specification. (https://boltprotocol.org/v1/) * Our Bolt protocol implementation is now completely compatible with the protocol version 1 specification. (https://boltprotocol.org/v1/)
* Added a log warning when running out of memory and the `memory_warning_threshold` flag
## v0.7.0 ## v0.7.0

View File

@ -72,8 +72,9 @@ parameters:
--snapshot-on-exit | bool | false | Make a snapshot when closing Memgraph. --snapshot-on-exit | bool | false | Make a snapshot when closing Memgraph.
--snapshot-recover-on-startup | bool | false | Recover the database on startup using the last<br/>stored snapshot. --snapshot-recover-on-startup | bool | false | Recover the database on startup using the last<br/>stored snapshot.
--query-execution-time-sec | integer | 180 | Maximum allowed query execution time. <br/>Queries exceeding this limit will be aborted. Value of -1 means no limit. --query-execution-time-sec | integer | 180 | Maximum allowed query execution time. <br/>Queries exceeding this limit will be aborted. Value of -1 means no limit.
--memory-warning-threshold | integer | 1024 | Memory warning threshold, in MB. If Memgraph detects there is less available RAM available it will log a warning. Set to 0 to disable.
[^1]: Maximum number of concurrent executions on the current CPU.
[^1]: Maximum number of concurrent executions on the current CPU.
To find more about how to execute queries on Memgraph please proceed to To find more about how to execute queries on Memgraph please proceed to
[Quick Start](quick-start.md). [Quick Start](quick-start.md).

View File

@ -12,8 +12,10 @@
#include "io/network/socket.hpp" #include "io/network/socket.hpp"
#include "utils/flag_validation.hpp" #include "utils/flag_validation.hpp"
#include "utils/scheduler.hpp"
#include "utils/signals/handler.hpp" #include "utils/signals/handler.hpp"
#include "utils/stacktrace.hpp" #include "utils/stacktrace.hpp"
#include "utils/sysinfo/memory.hpp"
#include "utils/terminate_handler.hpp" #include "utils/terminate_handler.hpp"
namespace fs = std::experimental::filesystem; namespace fs = std::experimental::filesystem;
@ -34,6 +36,10 @@ DEFINE_VALIDATED_int32(num_workers,
"Number of workers", FLAG_IN_RANGE(1, INT32_MAX)); "Number of workers", FLAG_IN_RANGE(1, INT32_MAX));
DEFINE_string(log_file, "memgraph.log", DEFINE_string(log_file, "memgraph.log",
"Path to where the log should be stored."); "Path to where the log should be stored.");
DEFINE_uint64(
memory_warning_threshold, 1024,
"Memory warning treshold, in MB. If Memgraph detects there is less available "
"RAM available it will log a warning. Set to 0 to disable.");
// Load flags in this order, the last one has the highest priority: // Load flags in this order, the last one has the highest priority:
// 1) /etc/memgraph/config // 1) /etc/memgraph/config
@ -143,6 +149,17 @@ int main(int argc, char **argv) {
SignalHandler::register_handler(Signal::Interupt, SignalHandler::register_handler(Signal::Interupt,
[&server]() { server.Shutdown(); }); [&server]() { server.Shutdown(); });
// Start memory warning logger.
Scheduler mem_log_scheduler;
if (FLAGS_memory_warning_threshold > 0) {
mem_log_scheduler.Run(std::chrono::seconds(3), [] {
auto free_ram_mb = utils::AvailableMem() / 1024;
if (free_ram_mb < FLAGS_memory_warning_threshold)
LOG(WARNING) << "Running out of available RAM, only " << free_ram_mb
<< " MB left.";
});
}
// Start worker threads. // Start worker threads.
server.Start(FLAGS_num_workers); server.Start(FLAGS_num_workers);

View File

@ -1,60 +1,27 @@
#pragma mark #include <fstream>
#include <iostream>
#include <limits>
#include <cstdio> namespace utils {
#include <cstdlib>
#include <cstring>
#include "sys/sysinfo.h"
#include "sys/types.h"
auto total_virtual_memory() {
struct sysinfo mem_info;
sysinfo(&mem_info);
long long total_virtual_memory = mem_info.totalram;
total_virtual_memory += mem_info.totalswap;
total_virtual_memory *= mem_info.mem_unit;
return total_virtual_memory;
}
auto used_virtual_memory() {
struct sysinfo mem_info;
sysinfo(&mem_info);
long long virtual_memory_used = mem_info.totalram - mem_info.freeram;
virtual_memory_used += mem_info.totalswap - mem_info.freeswap;
virtual_memory_used *= mem_info.mem_unit;
return virtual_memory_used;
}
// TODO: OS dependent
/** /**
* parses memory line from /proc/self/status * Gets the amount of available RAM in kilobytes. If the information is
* unavalable zero is returned.
*/ */
auto parse_vm_size(char *line) { inline auto AvailableMem() {
// This assumes that a digit will be found and the line ends in " Kb". std::string token;
auto i = std::strlen(line); std::ifstream meminfo("/proc/meminfo");
const char *p = line; while (meminfo >> token) {
while (*p < '0' || *p > '9') p++; if (token == "MemAvailable:") {
line[i - 3] = '\0'; unsigned long mem;
return std::atoll(p); if (meminfo >> mem) {
} return mem;
} else {
/** return 0UL;
* returns VmSize in kB }
*/
auto vm_size() {
std::FILE *file = std::fopen("/proc/self/status", "r");
auto result = -1LL;
char line[128];
while (fgets(line, 128, file) != NULL) {
if (strncmp(line, "VmSize:", 7) == 0) {
result = parse_vm_size(line);
break;
} }
meminfo.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
} }
return 0UL;
fclose(file);
return result;
} }
} // namespace utils

View File

@ -12,7 +12,6 @@
#include "data_structures/concurrent/concurrent_set.hpp" #include "data_structures/concurrent/concurrent_set.hpp"
#include "data_structures/concurrent/skiplist.hpp" #include "data_structures/concurrent/skiplist.hpp"
#include "utils/assert.hpp" #include "utils/assert.hpp"
#include "utils/sysinfo/memory.hpp"
// NOTE: this file is highly coupled to data_structures // NOTE: this file is highly coupled to data_structures
// TODO: REFACTOR // TODO: REFACTOR
@ -195,34 +194,3 @@ auto insert_try(typename S::Accessor &acc, long long &downcount,
} }
}; };
} }
// Performs memory check to determine if memory usage before calling given
// function
// is aproximately equal to memory usage after function. Memory usage is thread
// senstive so no_threads spawned in function is necessary.
void memory_check(size_t no_threads, std::function<void()> f) {
DLOG(INFO) << fmt::format("Number of threads: {}", no_threads);
// TODO: replace vm_size with something more appropriate
// the past implementation was teribble wrong
// to that ASAP
// OR
// use custom allocation wrapper
// OR
// user Boost.Test
auto start = vm_size();
DLOG(INFO) << fmt::format("Memory check (used memory at the beginning): {}",
start);
f();
auto end = vm_size();
DLOG(INFO) << fmt::format("Memory check (used memory at the end): {}", end);
long long delta = end - start;
DLOG(INFO) << fmt::format("Delta: {}", delta);
// TODO: do memory check somehow
// the past implementation was wrong
permanent_assert(true, "Memory leak");
}

View File

@ -14,65 +14,63 @@ constexpr size_t no_insert_for_one_delete = 1;
// no_find_per_change and no_insert_for_one_delete. // no_find_per_change and no_insert_for_one_delete.
int main(int argc, char **argv) { int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]); google::InitGoogleLogging(argv[0]);
memory_check(THREADS_NO, [] { ConcurrentList<std::pair<int, int>> list;
ConcurrentList<std::pair<int, int>> list; permanent_assert(list.size() == 0, "The list isn't empty");
permanent_assert(list.size() == 0, "The list isn't empty");
auto futures = run<std::pair<long long, long long>>( auto futures =
THREADS_NO, [&](auto index) mutable { run<std::pair<long long, long long>>(THREADS_NO, [&](auto index) mutable {
auto rand = rand_gen(key_range); auto rand = rand_gen(key_range);
auto rand_change = rand_gen_bool(no_find_per_change); auto rand_change = rand_gen_bool(no_find_per_change);
auto rand_delete = rand_gen_bool(no_insert_for_one_delete); auto rand_delete = rand_gen_bool(no_insert_for_one_delete);
long long sum = 0; long long sum = 0;
long long count = 0; long long count = 0;
for (int i = 0; i < op_per_thread; i++) { for (int i = 0; i < op_per_thread; i++) {
auto num = rand(); auto num = rand();
auto data = num % max_number; auto data = num % max_number;
if (rand_change()) { if (rand_change()) {
if (rand_delete()) { if (rand_delete()) {
for (auto it = list.begin(); it != list.end(); it++) { for (auto it = list.begin(); it != list.end(); it++) {
if (it->first == num) { if (it->first == num) {
if (it.remove()) { if (it.remove()) {
sum -= data; sum -= data;
count--; count--;
}
break;
} }
}
} else {
list.begin().push(std::make_pair(num, data));
sum += data;
count++;
}
} else {
for (auto &v : list) {
if (v.first == num) {
permanent_assert(v.second == data, "Data is invalid");
break; break;
} }
} }
} else {
list.begin().push(std::make_pair(num, data));
sum += data;
count++;
}
} else {
for (auto &v : list) {
if (v.first == num) {
permanent_assert(v.second == data, "Data is invalid");
break;
}
} }
} }
}
return std::pair<long long, long long>(sum, count); return std::pair<long long, long long>(sum, count);
}); });
auto it = list.begin(); auto it = list.begin();
long long sums = 0; long long sums = 0;
long long counters = 0; long long counters = 0;
for (auto &data : collect(futures)) { for (auto &data : collect(futures)) {
sums += data.second.first; sums += data.second.first;
counters += data.second.second; counters += data.second.second;
} }
for (auto &e : list) { for (auto &e : list) {
sums -= e.second; sums -= e.second;
} }
permanent_assert(sums == 0, "Same values aren't present"); permanent_assert(sums == 0, "Same values aren't present");
check_size_list<ConcurrentList<std::pair<int, int>>>(list, counters); check_size_list<ConcurrentList<std::pair<int, int>>>(list, counters);
std::this_thread::sleep_for(1s); std::this_thread::sleep_for(1s);
});
} }

View File

@ -11,32 +11,29 @@ constexpr size_t key_range = elems_per_thread * THREADS_NO * 2;
// Test checks for missing data and changed/overwriten data. // Test checks for missing data and changed/overwriten data.
int main(int argc, char **argv) { int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]); google::InitGoogleLogging(argv[0]);
map_t skiplist;
memory_check(THREADS_NO, [] { auto futures =
map_t skiplist; run<std::vector<size_t>>(THREADS_NO, skiplist, [](auto acc, auto index) {
auto rand = rand_gen(key_range);
long long downcount = elems_per_thread;
std::vector<size_t> owned;
auto inserter =
insert_try<size_t, size_t, map_t>(acc, downcount, owned);
auto futures = run<std::vector<size_t>>( do {
THREADS_NO, skiplist, [](auto acc, auto index) { inserter(rand(), index);
auto rand = rand_gen(key_range); } while (downcount > 0);
long long downcount = elems_per_thread;
std::vector<size_t> owned;
auto inserter =
insert_try<size_t, size_t, map_t>(acc, downcount, owned);
do { check_present_same<map_t>(acc, index, owned);
inserter(rand(), index); return owned;
} while (downcount > 0); });
check_present_same<map_t>(acc, index, owned); auto accessor = skiplist.access();
return owned; for (auto &owned : collect(futures)) {
}); check_present_same<map_t>(accessor, owned);
}
auto accessor = skiplist.access(); check_size<map_t>(accessor, THREADS_NO * elems_per_thread);
for (auto &owned : collect(futures)) { check_order<map_t>(accessor);
check_present_same<map_t>(accessor, owned);
}
check_size<map_t>(accessor, THREADS_NO * elems_per_thread);
check_order<map_t>(accessor);
});
} }

View File

@ -11,33 +11,30 @@ constexpr size_t elems_per_thread = 100000;
// Test checks for missing data and changed/overwriten data. // Test checks for missing data and changed/overwriten data.
int main(int argc, char **argv) { int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]); google::InitGoogleLogging(argv[0]);
map_t skiplist;
memory_check(THREADS_NO, [] { auto futures =
map_t skiplist; run<std::vector<size_t>>(THREADS_NO, skiplist, [](auto acc, auto index) {
long long downcount = elems_per_thread;
auto futures = run<std::vector<size_t>>( std::vector<size_t> owned;
THREADS_NO, skiplist, [](auto acc, auto index) { auto inserter =
long long downcount = elems_per_thread; insert_try<size_t, size_t, map_t>(acc, downcount, owned);
std::vector<size_t> owned;
auto inserter =
insert_try<size_t, size_t, map_t>(acc, downcount, owned);
#pragma GCC diagnostic push #pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wfor-loop-analysis" #pragma GCC diagnostic ignored "-Wfor-loop-analysis"
for (int i = 0; downcount > 0; i++) { for (int i = 0; downcount > 0; i++) {
inserter(i, index); inserter(i, index);
} }
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
check_present_same<map_t>(acc, index, owned); check_present_same<map_t>(acc, index, owned);
return owned; return owned;
}); });
auto accessor = skiplist.access(); auto accessor = skiplist.access();
for (auto &owned : collect(futures)) { for (auto &owned : collect(futures)) {
check_present_same<map_t>(accessor, owned); check_present_same<map_t>(accessor, owned);
} }
check_size<map_t>(accessor, THREADS_NO * elems_per_thread); check_size<map_t>(accessor, THREADS_NO * elems_per_thread);
check_order<map_t>(accessor); check_order<map_t>(accessor);
});
} }

View File

@ -7,71 +7,69 @@ constexpr size_t elems_per_thread = 1e5;
int main(int, char **argv) { int main(int, char **argv) {
google::InitGoogleLogging(argv[0]); google::InitGoogleLogging(argv[0]);
memory_check(THREADS_NO, [&] { std::vector<std::thread> threads;
std::vector<std::thread> threads; map_t skiplist;
map_t skiplist;
// put THREADS_NO * elems_per_thread items to the skiplist // put THREADS_NO * elems_per_thread items to the skiplist
for (size_t thread_i = 0; thread_i < THREADS_NO; ++thread_i) { for (size_t thread_i = 0; thread_i < THREADS_NO; ++thread_i) {
threads.emplace_back( threads.emplace_back(
[&skiplist](size_t start, size_t end) { [&skiplist](size_t start, size_t end) {
auto accessor = skiplist.access(); auto accessor = skiplist.access();
for (size_t elem_i = start; elem_i < end; ++elem_i) { for (size_t elem_i = start; elem_i < end; ++elem_i) {
accessor.insert(elem_i, elem_i); accessor.insert(elem_i, elem_i);
} }
}, },
thread_i * elems_per_thread, thread_i * elems_per_thread,
thread_i * elems_per_thread + elems_per_thread); thread_i * elems_per_thread + elems_per_thread);
} }
// wait all threads // wait all threads
for (auto &thread : threads) { for (auto &thread : threads) {
thread.join(); thread.join();
} }
// get skiplist size // get skiplist size
{ {
auto accessor = skiplist.access(); auto accessor = skiplist.access();
permanent_assert(accessor.size() == THREADS_NO * elems_per_thread, permanent_assert(accessor.size() == THREADS_NO * elems_per_thread,
"all elements in skiplist"); "all elements in skiplist");
} }
for (size_t thread_i = 0; thread_i < THREADS_NO; ++thread_i) { for (size_t thread_i = 0; thread_i < THREADS_NO; ++thread_i) {
threads[thread_i] = std::thread( threads[thread_i] = std::thread(
[&skiplist](size_t start, size_t end) { [&skiplist](size_t start, size_t end) {
auto accessor = skiplist.access(); auto accessor = skiplist.access();
for (size_t elem_i = start; elem_i < end; ++elem_i) { for (size_t elem_i = start; elem_i < end; ++elem_i) {
permanent_assert(accessor.remove(elem_i) == true, ""); permanent_assert(accessor.remove(elem_i) == true, "");
} }
}, },
thread_i * elems_per_thread, thread_i * elems_per_thread,
thread_i * elems_per_thread + elems_per_thread); thread_i * elems_per_thread + elems_per_thread);
} }
// // wait all threads // // wait all threads
for (auto &thread : threads) { for (auto &thread : threads) {
thread.join(); thread.join();
} }
// check size // check size
{ {
auto accessor = skiplist.access(); auto accessor = skiplist.access();
permanent_assert(accessor.size() == 0, "Size should be 0, but size is " permanent_assert(accessor.size() == 0, "Size should be 0, but size is "
<< accessor.size()); << accessor.size());
} }
// check count // check count
{ {
size_t iterator_counter = 0; size_t iterator_counter = 0;
auto accessor = skiplist.access(); auto accessor = skiplist.access();
for (auto elem : accessor) { for (auto elem : accessor) {
++iterator_counter; ++iterator_counter;
cout << elem.first << " "; cout << elem.first << " ";
}
permanent_assert(iterator_counter == 0, "deleted elements");
} }
permanent_assert(iterator_counter == 0, "deleted elements");
}
{ {
auto accessor = skiplist.access(); auto accessor = skiplist.access();
check_order<map_t>(accessor); check_order<map_t>(accessor);
} }
});
} }

View File

@ -9,19 +9,16 @@ constexpr size_t elements = 2e6;
*/ */
int main(int argc, char **argv) { int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]); google::InitGoogleLogging(argv[0]);
map_t skiplist;
memory_check(THREADS_NO, [] { auto futures = run<size_t>(THREADS_NO, skiplist, [](auto acc, auto index) {
map_t skiplist; for (size_t i = 0; i < elements; i++) {
acc.insert(i, index);
auto futures = run<size_t>(THREADS_NO, skiplist, [](auto acc, auto index) { }
for (size_t i = 0; i < elements; i++) { return index;
acc.insert(i, index);
}
return index;
});
collect(futures);
auto accessor = skiplist.access();
check_size<map_t>(accessor, elements);
}); });
collect(futures);
auto accessor = skiplist.access();
check_size<map_t>(accessor, elements);
} }

View File

@ -4,7 +4,7 @@ constexpr size_t THREADS_NO = std::min(max_no_threads, 1);
constexpr size_t elems_per_thread = 16e5; constexpr size_t elems_per_thread = 16e5;
// TODO: Memory leak at 1,600,000 elements (Kruno wrote this here but // TODO: Memory leak at 1,600,000 elements (Kruno wrote this here but
// the memory_check method had invalid implementation) // the previous (now deleted) memory_check method had invalid implementation)
// 1. implement valid memory_check // 1. implement valid memory_check
// 2. analyse this code // 2. analyse this code
// 3. fix the memory leak // 3. fix the memory leak
@ -12,66 +12,64 @@ constexpr size_t elems_per_thread = 16e5;
int main(int, char **argv) { int main(int, char **argv) {
google::InitGoogleLogging(argv[0]); google::InitGoogleLogging(argv[0]);
memory_check(THREADS_NO, [&] { std::vector<std::thread> threads;
std::vector<std::thread> threads; map_t skiplist;
map_t skiplist;
// put THREADS_NO * elems_per_thread items to the skiplist // put THREADS_NO * elems_per_thread items to the skiplist
for (size_t thread_i = 0; thread_i < THREADS_NO; ++thread_i) { for (size_t thread_i = 0; thread_i < THREADS_NO; ++thread_i) {
threads.emplace_back( threads.emplace_back(
[&skiplist](size_t start, size_t end) { [&skiplist](size_t start, size_t end) {
auto accessor = skiplist.access(); auto accessor = skiplist.access();
for (size_t elem_i = start; elem_i < end; ++elem_i) { for (size_t elem_i = start; elem_i < end; ++elem_i) {
accessor.insert(elem_i, elem_i); accessor.insert(elem_i, elem_i);
} }
}, },
thread_i * elems_per_thread, thread_i * elems_per_thread,
thread_i * elems_per_thread + elems_per_thread); thread_i * elems_per_thread + elems_per_thread);
} }
// wait all threads // wait all threads
for (auto &thread : threads) { for (auto &thread : threads) {
thread.join(); thread.join();
} }
// get skiplist size // get skiplist size
{ {
auto accessor = skiplist.access(); auto accessor = skiplist.access();
permanent_assert(accessor.size() == THREADS_NO * elems_per_thread, permanent_assert(accessor.size() == THREADS_NO * elems_per_thread,
"all elements in skiplist"); "all elements in skiplist");
} }
for (size_t thread_i = 0; thread_i < THREADS_NO; ++thread_i) { for (size_t thread_i = 0; thread_i < THREADS_NO; ++thread_i) {
threads[thread_i] = std::thread( threads[thread_i] = std::thread(
[&skiplist](size_t start, size_t end) { [&skiplist](size_t start, size_t end) {
auto accessor = skiplist.access(); auto accessor = skiplist.access();
for (size_t elem_i = start; elem_i < end; ++elem_i) { for (size_t elem_i = start; elem_i < end; ++elem_i) {
permanent_assert(accessor.remove(elem_i) == true, ""); permanent_assert(accessor.remove(elem_i) == true, "");
} }
}, },
thread_i * elems_per_thread, thread_i * elems_per_thread,
thread_i * elems_per_thread + elems_per_thread); thread_i * elems_per_thread + elems_per_thread);
} }
// // wait all threads // // wait all threads
for (auto &thread : threads) { for (auto &thread : threads) {
thread.join(); thread.join();
} }
// check size // check size
{ {
auto accessor = skiplist.access(); auto accessor = skiplist.access();
permanent_assert(accessor.size() == 0, "Size should be 0, but size is " permanent_assert(accessor.size() == 0, "Size should be 0, but size is "
<< accessor.size()); << accessor.size());
} }
// check count // check count
{ {
size_t iterator_counter = 0; size_t iterator_counter = 0;
auto accessor = skiplist.access(); auto accessor = skiplist.access();
for (auto elem : accessor) { for (auto elem : accessor) {
++iterator_counter; ++iterator_counter;
cout << elem.first << " "; cout << elem.first << " ";
}
permanent_assert(iterator_counter == 0, "deleted elements");
} }
}); permanent_assert(iterator_counter == 0, "deleted elements");
}
} }

View File

@ -14,54 +14,51 @@ constexpr size_t no_insert_for_one_delete = 2;
// Calls of remove method are interleaved with insert calls. // Calls of remove method are interleaved with insert calls.
int main(int argc, char **argv) { int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]); google::InitGoogleLogging(argv[0]);
map_t skiplist;
memory_check(THREADS_NO, [] { auto futures = run<std::pair<long long, long long>>(
map_t skiplist; THREADS_NO, skiplist, [](auto acc, auto index) {
auto rand_op = rand_gen_bool(no_insert_for_one_delete);
long long downcount = op_per_thread;
long long sum = 0;
long long count = 0;
auto futures = run<std::pair<long long, long long>>( for (int i = 0; downcount > 0; i++) {
THREADS_NO, skiplist, [](auto acc, auto index) { auto data = i % max_number;
auto rand_op = rand_gen_bool(no_insert_for_one_delete); if (rand_op()) {
long long downcount = op_per_thread; auto t = i;
long long sum = 0; while (t > 0) {
long long count = 0; if (acc.remove(t)) {
sum -= t % max_number;
for (int i = 0; downcount > 0; i++) {
auto data = i % max_number;
if (rand_op()) {
auto t = i;
while (t > 0) {
if (acc.remove(t)) {
sum -= t % max_number;
downcount--;
count--;
break;
}
t--;
}
} else {
if (acc.insert(i, data).second) {
sum += data;
count++;
downcount--; downcount--;
count--;
break;
} }
t--;
}
} else {
if (acc.insert(i, data).second) {
sum += data;
count++;
downcount--;
} }
} }
return std::pair<long long, long long>(sum, count); }
}); return std::pair<long long, long long>(sum, count);
});
auto accessor = skiplist.access(); auto accessor = skiplist.access();
long long sums = 0; long long sums = 0;
long long counters = 0; long long counters = 0;
for (auto &data : collect(futures)) { for (auto &data : collect(futures)) {
sums += data.second.first; sums += data.second.first;
counters += data.second.second; counters += data.second.second;
} }
for (auto &e : accessor) { for (auto &e : accessor) {
sums -= e.second; sums -= e.second;
} }
permanent_assert(sums == 0, "Aproximetly Same values are present"); permanent_assert(sums == 0, "Aproximetly Same values are present");
check_size<map_t>(accessor, counters); check_size<map_t>(accessor, counters);
check_order<map_t>(accessor); check_order<map_t>(accessor);
});
} }

View File

@ -12,41 +12,38 @@ constexpr size_t no_insert_for_one_delete = 1;
// Calls of remove method are interleaved with insert calls. // Calls of remove method are interleaved with insert calls.
int main(int argc, char **argv) { int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]); google::InitGoogleLogging(argv[0]);
map_t skiplist;
memory_check(THREADS_NO, [] { auto futures =
map_t skiplist; run<std::vector<size_t>>(THREADS_NO, skiplist, [](auto acc, auto index) {
auto rand = rand_gen(key_range);
auto rand_op = rand_gen_bool(no_insert_for_one_delete);
long long downcount = op_per_thread;
std::vector<size_t> owned;
auto inserter =
insert_try<size_t, size_t, map_t>(acc, downcount, owned);
auto futures = run<std::vector<size_t>>( do {
THREADS_NO, skiplist, [](auto acc, auto index) { if (owned.size() != 0 && rand_op()) {
auto rand = rand_gen(key_range); auto rem = rand() % owned.size();
auto rand_op = rand_gen_bool(no_insert_for_one_delete); permanent_assert(acc.remove(owned[rem]), "Owned data removed");
long long downcount = op_per_thread; owned.erase(owned.begin() + rem);
std::vector<size_t> owned; downcount--;
auto inserter = } else {
insert_try<size_t, size_t, map_t>(acc, downcount, owned); inserter(rand(), index);
}
} while (downcount > 0);
do { check_present_same<map_t>(acc, index, owned);
if (owned.size() != 0 && rand_op()) { return owned;
auto rem = rand() % owned.size(); });
permanent_assert(acc.remove(owned[rem]), "Owned data removed");
owned.erase(owned.begin() + rem);
downcount--;
} else {
inserter(rand(), index);
}
} while (downcount > 0);
check_present_same<map_t>(acc, index, owned); auto accessor = skiplist.access();
return owned; size_t count = 0;
}); for (auto &owned : collect(futures)) {
check_present_same<map_t>(accessor, owned);
auto accessor = skiplist.access(); count += owned.second.size();
size_t count = 0; }
for (auto &owned : collect(futures)) { check_size<map_t>(accessor, count);
check_present_same<map_t>(accessor, owned); check_order<map_t>(accessor);
count += owned.second.size();
}
check_size<map_t>(accessor, count);
check_order<map_t>(accessor);
});
} }

View File

@ -14,52 +14,49 @@ constexpr size_t no_insert_for_one_delete = 2;
// Calls of remove method are interleaved with insert calls. // Calls of remove method are interleaved with insert calls.
int main(int argc, char **argv) { int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]); google::InitGoogleLogging(argv[0]);
map_t skiplist;
memory_check(THREADS_NO, [] { auto futures = run<std::pair<long long, long long>>(
map_t skiplist; THREADS_NO, skiplist, [](auto acc, auto index) {
auto rand = rand_gen(key_range);
auto rand_op = rand_gen_bool(no_insert_for_one_delete);
long long downcount = op_per_thread;
long long sum = 0;
long long count = 0;
auto futures = run<std::pair<long long, long long>>( do {
THREADS_NO, skiplist, [](auto acc, auto index) { auto num = rand();
auto rand = rand_gen(key_range); auto data = num % max_number;
auto rand_op = rand_gen_bool(no_insert_for_one_delete); if (rand_op()) {
long long downcount = op_per_thread; if (acc.remove(num)) {
long long sum = 0; sum -= data;
long long count = 0; downcount--;
count--;
do {
auto num = rand();
auto data = num % max_number;
if (rand_op()) {
if (acc.remove(num)) {
sum -= data;
downcount--;
count--;
}
} else {
if (acc.insert(num, data).second) {
sum += data;
downcount--;
count++;
}
} }
} while (downcount > 0); } else {
if (acc.insert(num, data).second) {
sum += data;
downcount--;
count++;
}
}
} while (downcount > 0);
return std::pair<long long, long long>(sum, count); return std::pair<long long, long long>(sum, count);
}); });
auto accessor = skiplist.access(); auto accessor = skiplist.access();
long long sums = 0; long long sums = 0;
long long counters = 0; long long counters = 0;
for (auto &data : collect(futures)) { for (auto &data : collect(futures)) {
sums += data.second.first; sums += data.second.first;
counters += data.second.second; counters += data.second.second;
} }
for (auto &e : accessor) { for (auto &e : accessor) {
sums -= e.second; sums -= e.second;
} }
permanent_assert(sums == 0, "Aproximetly Same values are present"); permanent_assert(sums == 0, "Aproximetly Same values are present");
check_size<map_t>(accessor, counters); check_size<map_t>(accessor, counters);
check_order<map_t>(accessor); check_order<map_t>(accessor);
});
} }

View File

@ -12,55 +12,52 @@ constexpr size_t no_insert_for_one_delete = 2;
// Calls of remove method are interleaved with insert calls. // Calls of remove method are interleaved with insert calls.
int main(int argc, char **argv) { int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]); google::InitGoogleLogging(argv[0]);
ConcurrentSet<std::string> skiplist;
memory_check(THREADS_NO, [] { auto futures =
ConcurrentSet<std::string> skiplist; run<std::vector<long>>(THREADS_NO, skiplist, [](auto acc, auto index) {
auto rand = rand_gen(key_range);
auto rand_op = rand_gen_bool(no_insert_for_one_delete);
long long downcount = op_per_thread;
std::vector<long> set(key_range);
auto futures = do {
run<std::vector<long>>(THREADS_NO, skiplist, [](auto acc, auto index) { int num = rand();
auto rand = rand_gen(key_range); std::string num_str = std::to_string(num);
auto rand_op = rand_gen_bool(no_insert_for_one_delete); if (rand_op()) {
long long downcount = op_per_thread; if (acc.remove(num_str)) {
std::vector<long> set(key_range); downcount--;
set[num]--;
do {
int num = rand();
std::string num_str = std::to_string(num);
if (rand_op()) {
if (acc.remove(num_str)) {
downcount--;
set[num]--;
}
} else {
std::string num_str = std::to_string(num);
if (acc.insert(num_str).second) {
downcount--;
set[num]++;
}
} }
} while (downcount > 0); } else {
std::string num_str = std::to_string(num);
if (acc.insert(num_str).second) {
downcount--;
set[num]++;
}
}
} while (downcount > 0);
return set; return set;
}); });
long set[key_range] = {0}; long set[key_range] = {0};
for (auto &data : collect(futures)) { for (auto &data : collect(futures)) {
for (int i = 0; i < key_range; i++) {
set[i] += data.second[i];
}
}
auto accessor = skiplist.access();
for (int i = 0; i < key_range; i++) { for (int i = 0; i < key_range; i++) {
permanent_assert(set[i] == 0 || set[i] == 1 || set[i] += data.second[i];
(set[i] == 1) ^ accessor.contains(std::to_string(i)),
"Set doesn't hold it's guarantees.");
} }
}
for (auto &e : accessor) { auto accessor = skiplist.access();
set[std::stoi(e)]--; for (int i = 0; i < key_range; i++) {
} permanent_assert(set[i] == 0 || set[i] == 1 ||
(set[i] == 1) ^ accessor.contains(std::to_string(i)),
"Set doesn't hold it's guarantees.");
}
check_zero(key_range, set, "Set"); for (auto &e : accessor) {
}); set[std::stoi(e)]--;
}
check_zero(key_range, set, "Set");
} }

View File

@ -16,56 +16,53 @@ constexpr size_t no_insert_for_one_delete = 1;
// no_find_per_change and no_insert_for_one_delete. // no_find_per_change and no_insert_for_one_delete.
int main(int argc, char **argv) { int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]); google::InitGoogleLogging(argv[0]);
map_t skiplist;
memory_check(THREADS_NO, [] { auto futures = run<std::pair<long long, long long>>(
map_t skiplist; THREADS_NO, skiplist, [](auto acc, auto index) {
auto rand = rand_gen(key_range);
auto rand_change = rand_gen_bool(no_find_per_change);
auto rand_delete = rand_gen_bool(no_insert_for_one_delete);
long long sum = 0;
long long count = 0;
auto futures = run<std::pair<long long, long long>>( for (int i = 0; i < op_per_thread; i++) {
THREADS_NO, skiplist, [](auto acc, auto index) { auto num = rand();
auto rand = rand_gen(key_range); auto data = num % max_number;
auto rand_change = rand_gen_bool(no_find_per_change); if (rand_change()) {
auto rand_delete = rand_gen_bool(no_insert_for_one_delete); if (rand_delete()) {
long long sum = 0; if (acc.remove(num)) {
long long count = 0; sum -= data;
count--;
for (int i = 0; i < op_per_thread; i++) {
auto num = rand();
auto data = num % max_number;
if (rand_change()) {
if (rand_delete()) {
if (acc.remove(num)) {
sum -= data;
count--;
}
} else {
if (acc.insert(num, data).second) {
sum += data;
count++;
}
} }
} else { } else {
auto value = acc.find(num); if (acc.insert(num, data).second) {
permanent_assert(value == acc.end() || value->second == data, sum += data;
"Data is invalid"); count++;
}
} }
} else {
auto value = acc.find(num);
permanent_assert(value == acc.end() || value->second == data,
"Data is invalid");
} }
}
return std::pair<long long, long long>(sum, count); return std::pair<long long, long long>(sum, count);
}); });
auto accessor = skiplist.access(); auto accessor = skiplist.access();
long long sums = 0; long long sums = 0;
long long counters = 0; long long counters = 0;
for (auto &data : collect(futures)) { for (auto &data : collect(futures)) {
sums += data.second.first; sums += data.second.first;
counters += data.second.second; counters += data.second.second;
} }
for (auto &e : accessor) { for (auto &e : accessor) {
sums -= e.second; sums -= e.second;
} }
permanent_assert(sums == 0, "Same values aren't present"); permanent_assert(sums == 0, "Same values aren't present");
check_size<map_t>(accessor, counters); check_size<map_t>(accessor, counters);
check_order<map_t>(accessor); check_order<map_t>(accessor);
});
} }