Implement jemalloc extent hooks memory tracker (#1250)

Should improve/fix memory usage exceeds --memory-limit issues
This commit is contained in:
Antonio Filipovic 2023-10-23 12:48:26 +02:00 committed by GitHub
parent 26e31ca06f
commit 7f7f3adfcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 988 additions and 100 deletions

View File

@ -1,55 +0,0 @@
# Try to find jemalloc library
#
# Use this module as:
# find_package(Jemalloc)
#
# or:
# find_package(Jemalloc REQUIRED)
#
# This will define the following variables:
#
# Jemalloc_FOUND True if the system has the jemalloc library.
# Jemalloc_INCLUDE_DIRS Include directories needed to use jemalloc.
# Jemalloc_LIBRARIES Libraries needed to link to jemalloc.
#
# The following cache variables may also be set:
#
# Jemalloc_INCLUDE_DIR The directory containing jemalloc/jemalloc.h.
# Jemalloc_LIBRARY The path to the jemalloc static library.
find_path(Jemalloc_INCLUDE_DIR NAMES jemalloc/jemalloc.h PATH_SUFFIXES include)
find_library(Jemalloc_LIBRARY NAMES libjemalloc.a PATH_SUFFIXES lib)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(Jemalloc
FOUND_VAR Jemalloc_FOUND
REQUIRED_VARS
Jemalloc_LIBRARY
Jemalloc_INCLUDE_DIR
)
if(Jemalloc_FOUND)
set(Jemalloc_LIBRARIES ${Jemalloc_LIBRARY})
set(Jemalloc_INCLUDE_DIRS ${Jemalloc_INCLUDE_DIR})
else()
if(Jemalloc_FIND_REQUIRED)
message(FATAL_ERROR "Cannot find jemalloc!")
else()
message(WARNING "jemalloc is not found!")
endif()
endif()
if(Jemalloc_FOUND AND NOT TARGET Jemalloc::Jemalloc)
add_library(Jemalloc::Jemalloc UNKNOWN IMPORTED)
set_target_properties(Jemalloc::Jemalloc
PROPERTIES
IMPORTED_LOCATION "${Jemalloc_LIBRARY}"
INTERFACE_INCLUDE_DIRECTORIES "${Jemalloc_INCLUDE_DIR}"
)
endif()
mark_as_advanced(
Jemalloc_INCLUDE_DIR
Jemalloc_LIBRARY
)

67
cmake/Findjemalloc.cmake Normal file
View File

@ -0,0 +1,67 @@
# Try to find jemalloc library
#
# Use this module as:
# find_package(jemalloc)
#
# or:
# find_package(jemalloc REQUIRED)
#
# This will define the following variables:
#
# JEMALLOC_FOUND True if the system has the jemalloc library.
# Jemalloc_INCLUDE_DIRS Include directories needed to use jemalloc.
# Jemalloc_LIBRARIES Libraries needed to link to jemalloc.
#
# The following cache variables may also be set:
#
# Jemalloc_INCLUDE_DIR The directory containing jemalloc/jemalloc.h.
# Jemalloc_LIBRARY The path to the jemalloc static library.
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(jemalloc
FOUND_VAR JEMALLOC_FOUND
REQUIRED_VARS
JEMALLOC_LIBRARY
JEMALLOC_INCLUDE_DIR
)
if(JEMALLOC_INCLUDE_DIR)
message(STATUS "Found jemalloc include dir: ${JEMALLOC_INCLUDE_DIR}")
else()
message(WARNING "jemalloc not found!")
endif()
if(JEMALLOC_LIBRARY)
message(STATUS "Found jemalloc library: ${JEMALLOC_LIBRARY}")
else()
message(WARNING "jemalloc library not found!")
endif()
if(JEMALLOC_FOUND)
set(Jemalloc_LIBRARIES ${JEMALLOC_LIBRARY})
set(Jemalloc_INCLUDE_DIRS ${JEMALLOC_INCLUDE_DIR})
else()
if(Jemalloc_FIND_REQUIRED)
message(FATAL_ERROR "Cannot find jemalloc!")
else()
message(WARNING "jemalloc is not found!")
endif()
endif()
if(JEMALLOC_FOUND AND NOT TARGET Jemalloc::Jemalloc)
message(STATUS "JEMALLOC NOT TARGET")
add_library(Jemalloc::Jemalloc UNKNOWN IMPORTED)
set_target_properties(Jemalloc::Jemalloc
PROPERTIES
IMPORTED_LOCATION "${JEMALLOC_LIBRARY}"
INTERFACE_INCLUDE_DIRECTORIES "${JEMALLOC_INCLUDE_DIR}"
)
endif()
mark_as_advanced(
JEMALLOC_INCLUDE_DIR
JEMALLOC_LIBRARY
)

View File

@ -15,7 +15,6 @@ set(GFLAGS_NOTHREADS OFF)
# NOTE: config/generate.py depends on the gflags help XML format.
find_package(gflags REQUIRED)
find_package(fmt 8.0.1)
find_package(Jemalloc REQUIRED)
find_package(ZLIB 1.2.11 REQUIRED)
set(LIB_DIR ${CMAKE_CURRENT_SOURCE_DIR})
@ -99,6 +98,17 @@ macro(import_external_library name type library_location include_dir)
import_library(${name} ${type} ${${_upper_name}_LIBRARY} ${${_upper_name}_INCLUDE_DIR})
endmacro(import_external_library)
macro(set_path_external_library name type library_location include_dir)
string(TOUPPER ${name} _upper_name)
set(${_upper_name}_LIBRARY ${library_location} CACHE FILEPATH
"Path to ${name} library" FORCE)
set(${_upper_name}_INCLUDE_DIR ${include_dir} CACHE FILEPATH
"Path to ${name} include directory" FORCE)
mark_as_advanced(${name}_LIBRARY ${name}_INCLUDE_DIR)
endmacro(set_path_external_library)
# setup antlr
import_external_library(antlr4 STATIC
${CMAKE_CURRENT_SOURCE_DIR}/antlr4/runtime/Cpp/lib/libantlr4-runtime.a
@ -265,3 +275,8 @@ import_header_library(ctre ${CMAKE_CURRENT_SOURCE_DIR})
# setup absl (cmake sub_directory tolerant)
set(ABSL_PROPAGATE_CXX_STD ON)
add_subdirectory(absl EXCLUDE_FROM_ALL)
# set Jemalloc
set_path_external_library(jemalloc STATIC
${CMAKE_CURRENT_SOURCE_DIR}/jemalloc/lib/libjemalloc.a
${CMAKE_CURRENT_SOURCE_DIR}/jemalloc/include/)

View File

@ -124,6 +124,7 @@ declare -A primary_urls=(
["librdtsc"]="http://$local_cache_host/git/librdtsc.git"
["ctre"]="http://$local_cache_host/file/hanickadot/compile-time-regular-expressions/v3.7.2/single-header/ctre.hpp"
["absl"]="https://$local_cache_host/git/abseil-cpp.git"
["jemalloc"]="https://$local_cache_host/git/jemalloc.git"
)
# The goal of secondary urls is to have links to the "source of truth" of
@ -151,6 +152,7 @@ declare -A secondary_urls=(
["librdtsc"]="https://github.com/gabrieleara/librdtsc.git"
["ctre"]="https://raw.githubusercontent.com/hanickadot/compile-time-regular-expressions/v3.7.2/single-header/ctre.hpp"
["absl"]="https://github.com/abseil/abseil-cpp.git"
["jemalloc"]="https://github.com/jemalloc/jemalloc.git"
)
# antlr
@ -252,3 +254,21 @@ cd ..
# abseil 20230125.3
absl_ref="20230125.3"
repo_clone_try_double "${primary_urls[absl]}" "${secondary_urls[absl]}" "absl" "$absl_ref"
# jemalloc ea6b3e973b477b8061e0076bb257dbd7f3faa756
JEMALLOC_COMMIT_VERSION="5.2.1"
repo_clone_try_double "${secondary_urls[jemalloc]}" "${secondary_urls[jemalloc]}" "jemalloc" "$JEMALLOC_COMMIT_VERSION"
# this is hack for cmake in libs to set path, and for FindJemalloc to use Jemalloc_INCLUDE_DIR
pushd jemalloc
./autogen.sh
MALLOC_CONF="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000" \
./configure \
--disable-cxx \
--enable-shared=no --prefix=$working_dir \
--with-malloc-conf="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000"
make -j$CPUS install
popd

View File

@ -23,6 +23,7 @@
#include "glue/run_id.hpp"
#include "helpers.hpp"
#include "license/license_sender.hpp"
#include "memory/memory_control.hpp"
#include "query/config.hpp"
#include "query/discard_value_stream.hpp"
#include "query/interpreter.hpp"
@ -108,6 +109,7 @@ void InitSignalHandlers(const std::function<void()> &shutdown_fun) {
}
int main(int argc, char **argv) {
memgraph::memory::SetHooks();
google::SetUsageMessage("Memgraph database server");
gflags::SetVersionString(version_string);
@ -199,7 +201,6 @@ int main(int argc, char **argv) {
"won't be available.");
}
}
std::cout << "You are running Memgraph v" << gflags::VersionString() << std::endl;
std::cout << "To get started with Memgraph, visit https://memgr.ph/start" << std::endl;

View File

@ -2,7 +2,9 @@ set(memory_src_files
new_delete.cpp
memory_control.cpp)
find_package(Jemalloc REQUIRED)
find_package(jemalloc REQUIRED)
add_library(mg-memory STATIC ${memory_src_files})
target_link_libraries(mg-memory mg-utils fmt)

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -10,6 +10,8 @@
// licenses/APL.txt.
#include "memory_control.hpp"
#include "utils/logging.hpp"
#include "utils/memory_tracker.hpp"
#if USE_JEMALLOC
#include <jemalloc/jemalloc.h>
@ -22,6 +24,179 @@ namespace memgraph::memory {
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define STRINGIFY(x) STRINGIFY_HELPER(x)
#if USE_JEMALLOC
static void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t alignment, bool *zero,
bool *commit, unsigned arena_ind);
static bool my_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind);
static void my_destroy(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind);
static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length,
unsigned arena_ind);
static bool my_decommit(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length,
unsigned arena_ind);
static bool my_purge_forced(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length,
unsigned arena_ind);
extent_hooks_t *old_hooks = nullptr;
static extent_hooks_t custom_hooks = {
.alloc = &my_alloc,
.dalloc = &my_dalloc,
.destroy = &my_destroy,
.commit = &my_commit,
.decommit = &my_decommit,
.purge_lazy = nullptr,
.purge_forced = &my_purge_forced,
.split = nullptr,
.merge = nullptr,
};
static const extent_hooks_t *new_hooks = &custom_hooks;
void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t alignment, bool *zero, bool *commit,
unsigned arena_ind) {
// This needs to be before, to throw exception in case of too big alloc
if (*commit) [[likely]] {
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size));
}
auto *ptr = old_hooks->alloc(extent_hooks, new_addr, size, alignment, zero, commit, arena_ind);
if (ptr == nullptr) [[unlikely]] {
if (*commit) {
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
}
return ptr;
}
return ptr;
}
static bool my_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind) {
auto err = old_hooks->dalloc(extent_hooks, addr, size, committed, arena_ind);
if (err) [[unlikely]] {
return err;
}
if (committed) [[likely]] {
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
}
return false;
}
static void my_destroy(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind) {
if (committed) [[likely]] {
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
}
old_hooks->destroy(extent_hooks, addr, size, committed, arena_ind);
}
static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length,
unsigned arena_ind) {
auto err = old_hooks->commit(extent_hooks, addr, size, offset, length, arena_ind);
if (err) {
return err;
}
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(length));
return false;
}
static bool my_decommit(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length,
unsigned arena_ind) {
MG_ASSERT(old_hooks && old_hooks->decommit);
auto err = old_hooks->decommit(extent_hooks, addr, size, offset, length, arena_ind);
if (err) {
return err;
}
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(length));
return false;
}
static bool my_purge_forced(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length,
unsigned arena_ind) {
MG_ASSERT(old_hooks && old_hooks->purge_forced);
auto err = old_hooks->purge_forced(extent_hooks, addr, size, offset, length, arena_ind);
if (err) [[unlikely]] {
return err;
}
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(length));
return false;
}
#endif
void SetHooks() {
#if USE_JEMALLOC
uint64_t allocated{0};
uint64_t sz{sizeof(allocated)};
sz = sizeof(unsigned);
unsigned n_arenas{0};
int err = mallctl("opt.narenas", (void *)&n_arenas, &sz, nullptr, 0);
if (err) {
return;
}
if (nullptr != old_hooks) {
return;
}
for (int i = 0; i < n_arenas; i++) {
std::string func_name = "arena." + std::to_string(i) + ".extent_hooks";
size_t hooks_len = sizeof(old_hooks);
int err = mallctl(func_name.c_str(), &old_hooks, &hooks_len, nullptr, 0);
if (err) {
LOG_FATAL("Error getting hooks for jemalloc arena {}", i);
}
// Due to the way jemalloc works, we need first to set their hooks
// which will trigger creating arena, then we can set our custom hook wrappers
err = mallctl(func_name.c_str(), nullptr, nullptr, &old_hooks, sizeof(old_hooks));
MG_ASSERT(old_hooks);
MG_ASSERT(old_hooks->alloc);
MG_ASSERT(old_hooks->dalloc);
MG_ASSERT(old_hooks->destroy);
MG_ASSERT(old_hooks->commit);
MG_ASSERT(old_hooks->decommit);
MG_ASSERT(old_hooks->purge_forced);
MG_ASSERT(old_hooks->purge_lazy);
MG_ASSERT(old_hooks->split);
MG_ASSERT(old_hooks->merge);
custom_hooks.purge_lazy = old_hooks->purge_lazy;
custom_hooks.split = old_hooks->split;
custom_hooks.merge = old_hooks->merge;
if (err) {
LOG_FATAL("Error setting jemalloc hooks for jemalloc arena {}", i);
}
err = mallctl(func_name.c_str(), nullptr, nullptr, &new_hooks, sizeof(new_hooks));
if (err) {
LOG_FATAL("Error setting custom hooks for jemalloc arena {}", i);
}
}
#endif
}
void PurgeUnusedMemory() {
#if USE_JEMALLOC
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
@ -30,4 +205,5 @@ void PurgeUnusedMemory() {
#undef STRINGIFY
#undef STRINGIFY_HELPER
} // namespace memgraph::memory

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -11,6 +11,11 @@
#pragma once
#include <cstddef>
#include "utils/logging.hpp"
namespace memgraph::memory {
void PurgeUnusedMemory();
void SetHooks();
} // namespace memgraph::memory

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -87,21 +87,15 @@ void deleteSized(void *ptr, const std::size_t /*unused*/, const std::align_val_t
#endif
void TrackMemory(std::size_t size) {
#if USE_JEMALLOC
if (size != 0) [[likely]] {
size = nallocx(size, 0);
}
#endif
#if !USE_JEMALLOC
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size));
#endif
}
void TrackMemory(std::size_t size, const std::align_val_t align) {
#if USE_JEMALLOC
if (size != 0) [[likely]] {
size = nallocx(size, MALLOCX_ALIGN(align)); // NOLINT(hicpp-signed-bitwise)
}
#endif
#if !USE_JEMALLOC
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size));
#endif
}
bool TrackMemoryNoExcept(const std::size_t size) {
@ -126,11 +120,7 @@ bool TrackMemoryNoExcept(const std::size_t size, const std::align_val_t align) {
void UntrackMemory([[maybe_unused]] void *ptr, [[maybe_unused]] std::size_t size = 0) noexcept {
try {
#if USE_JEMALLOC
if (ptr != nullptr) [[likely]] {
memgraph::utils::total_memory_tracker.Free(sallocx(ptr, 0));
}
#else
#if !USE_JEMALLOC
if (size) {
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
} else {
@ -144,11 +134,7 @@ void UntrackMemory([[maybe_unused]] void *ptr, [[maybe_unused]] std::size_t size
void UntrackMemory(void *ptr, const std::align_val_t align, [[maybe_unused]] std::size_t size = 0) noexcept {
try {
#if USE_JEMALLOC
if (ptr != nullptr) [[likely]] {
memgraph::utils::total_memory_tracker.Free(sallocx(ptr, MALLOCX_ALIGN(align))); // NOLINT(hicpp-signed-bitwise)
}
#else
#if !USE_JEMALLOC
if (size) {
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
} else {

View File

@ -1,7 +1,7 @@
disk_storage: &disk_storage
cluster:
main:
args: ["--bolt-port", "7687", "--log-level", "TRACE", "--memory-limit", "50"]
args: ["--bolt-port", "7687", "--log-level", "TRACE", "--memory-limit", "125"]
log_file: "disk_storage.log"
setup_queries: []
validation_queries: []

View File

@ -130,9 +130,14 @@ class MemgraphInstanceRunner:
def stop(self):
if not self.is_running():
return
self.proc_mg.terminate()
code = self.proc_mg.wait()
assert code == 0, "The Memgraph process exited with non-zero!"
pid = self.proc_mg.pid
try:
os.kill(pid, 15) # 15 is the signal number for SIGTERM
except os.OSError:
assert False
time.sleep(1)
def kill(self):
if not self.is_running():

View File

@ -2,7 +2,7 @@ bolt_port: &bolt_port "7687"
args: &args
- "--bolt-port"
- *bolt_port
- "--memory-limit=1000"
- "--memory-limit=1024"
- "--storage-gc-cycle-sec=180"
- "--log-level=TRACE"

View File

@ -100,6 +100,25 @@ def execute_till_success(session, query, max_retries=1000):
raise Exception("Query '%s' failed %d times, aborting" % (query, max_retries))
def try_execute(session, query: str):
"""
Executes a query within Bolt session
Args:
session - the bolt session to execute the query with
query - str, the query to execute
:param session: active Bolt session
:param query: query to execute
:return: None
"""
result = session.run(query)
data = result.data()
summary = result.consume()
return data, summary
def batch(input, batch_size):
"""Batches the given input (must be iterable).
Supports input generators. Returns a generator.

View File

@ -8,7 +8,7 @@ import subprocess
import time
from argparse import Namespace as Args
from subprocess import Popen
from typing import Dict, List
from typing import Dict, List, Optional
from test_config import LARGE_DATASET, SMALL_DATASET, DatabaseMode, DatasetConstants
@ -79,7 +79,7 @@ def wait_for_server(port, delay=0.1) -> None:
time.sleep(delay)
def start_memgraph(args: Args) -> Popen:
def start_memgraph(args: Args, memgraph_options: List[str]) -> Popen:
"""Starts Memgraph and return the process"""
cwd = os.path.dirname(args.memgraph)
cmd = [
@ -93,7 +93,7 @@ def start_memgraph(args: Args) -> Popen:
"--storage-recover-on-startup=false",
"--query-execution-timeout-sec=1200",
"--bolt-server-name-for-init=Neo4j/",
]
] + memgraph_options
if not args.verbose:
cmd += ["--log-level", "WARNING"]
if args.log_file:
@ -168,7 +168,7 @@ def _find_test_binary(args: Args, test: str) -> List[str]:
raise Exception("Test '{}' binary not supported!".format(test))
def run_stress_test_suite(args: Args) -> Dict[str, float]:
def run_stress_test_suite(args: Args) -> Optional[Dict[str, float]]:
def cleanup(memgraph_proc):
if memgraph_proc.poll() != None:
return
@ -192,16 +192,30 @@ def run_stress_test_suite(args: Args) -> Dict[str, float]:
for mode in test[DatasetConstants.MODE]:
test_run = copy.deepcopy(test)
# Run for every specified combination of storage mode, serialization mode, etc (if extended)
test_run[DatasetConstants.MODE] = mode
memgraph_proc = start_memgraph(args)
runtime = run_test(args, **test_run)
runtimes[os.path.splitext(test[DatasetConstants.TEST])[0]] = runtime
stop_memgraph(memgraph_proc)
cleanup(memgraph_proc)
try:
memgraph_options = (
test_run[DatasetConstants.MEMGRAPH_OPTIONS] if DatasetConstants.MEMGRAPH_OPTIONS in test_run else []
)
if DatasetConstants.MEMGRAPH_OPTIONS in test_run:
del test_run[DatasetConstants.MEMGRAPH_OPTIONS]
memgraph_proc = start_memgraph(args, memgraph_options)
except Exception as ex:
print("Exception occured while starting memgraph", ex)
return None
err = False
try:
runtime = run_test(args, **test_run)
runtimes[os.path.splitext(test[DatasetConstants.TEST])[0]] = runtime
except Exception as ex:
print(f"Failed to execute {test_run} with following exception:", ex)
err = True
finally:
stop_memgraph(memgraph_proc)
cleanup(memgraph_proc)
if err:
return None
return runtimes
@ -225,10 +239,12 @@ if __name__ == "__main__":
generate_temporary_ssl_certs()
runtimes = run_stress_test_suite(args)
if runtimes is None:
print("Some stress tests have failed")
exit(1)
assert runtimes is not None
if args.use_ssl:
remove_certificates()
write_stats(runtimes)
print("Successfully ran stress tests!")

View File

@ -0,0 +1,257 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
"""
Stress test for monitoring how memory tracker behaves when
there is lot of node creation and deletions compared
to RES memory usage.
"""
import atexit
import logging
import multiprocessing
import time
from argparse import Namespace as Args
from dataclasses import dataclass
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, Tuple
from common import (
OutputData,
SessionCache,
connection_argument_parser,
execute_till_success,
try_execute,
)
log = logging.getLogger(__name__)
output_data = OutputData()
class Constants:
CREATE_FUNCTION = "CREATE"
atexit.register(SessionCache.cleanup)
MEMORY_LIMIT = 2048
def parse_args() -> Args:
"""
Parses user arguments
:return: parsed arguments
"""
parser = connection_argument_parser()
parser.add_argument("--worker-count", type=int, default=5, help="Number of concurrent workers.")
parser.add_argument(
"--logging", default="INFO", choices=["INFO", "DEBUG", "WARNING", "ERROR"], help="Logging level"
)
parser.add_argument("--repetition-count", type=int, default=1000, help="Number of times to perform the action")
parser.add_argument("--isolation-level", type=str, required=True, help="Database isolation level.")
parser.add_argument("--storage-mode", type=str, required=True, help="Database storage mode.")
return parser.parse_args()
# Global variables
args = parse_args()
# Difference between memory RES and memory tracker on
# Memgraph start.
# Due to various other things which are included in RES
# there is difference of ~30MBs initially.
initial_diff = 0
@dataclass
class Worker:
"""
Class that performs a function defined in the `type` argument.
Args:
type - either `CREATE` or `DELETE`, signifying the function that's going to be performed
by the worker
id - worker id
total_worker_cnt - total number of workers for reference
repetition_count - number of times to perform the worker action
sleep_sec - float for subsecond sleeping between two subsequent actions
"""
type: str
id: int
total_worker_cnt: int
repetition_count: int
sleep_sec: float
def timed_function(name) -> Callable:
"""
Times performed function
"""
def actual_decorator(func) -> Callable:
@wraps(func)
def timed_wrapper(*args, **kwargs) -> Any:
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
output_data.add_measurement(name, end_time - start_time)
return result
return timed_wrapper
return actual_decorator
@timed_function("cleanup_time")
def clean_database() -> None:
session = SessionCache.argument_session(args)
execute_till_success(session, "MATCH (n) DETACH DELETE n")
def create_indices() -> None:
session = SessionCache.argument_session(args)
execute_till_success(session, "CREATE INDEX ON :Node")
def setup_database_mode() -> None:
session = SessionCache.argument_session(args)
execute_till_success(session, f"STORAGE MODE {args.storage_mode}")
execute_till_success(session, f"SET GLOBAL TRANSACTION ISOLATION LEVEL {args.isolation_level}")
def get_tracker_data(session) -> Optional[float]:
def parse_data(allocated: str) -> float:
num = 0
if "KiB" in allocated or "MiB" in allocated or "GiB" in allocated or "TiB" in allocated:
num = float(allocated[:-3])
else:
num = float(allocated[-1])
if "KiB" in allocated:
return num / 1024
if "MiB" in allocated:
return num
if "GiB" in allocated:
return num * 1024
else:
return num * 1024 * 1024
def isolate_value(data: List[Dict[str, Any]], key: str) -> Optional[str]:
for dict in data:
if dict["storage info"] == key:
return dict["value"]
return None
try:
data, _ = try_execute(session, f"SHOW STORAGE INFO")
memory_tracker_data = isolate_value(data, "memory_allocated")
return parse_data(memory_tracker_data)
except Exception as ex:
log.info(f"Get storage info failed with error", ex)
return None
def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int:
"""
This writer creates lot of nodes on each write.
Also it checks that query failed if memory limit is tried to be broken
"""
session = SessionCache.argument_session(args)
def create() -> bool:
"""
Returns True if done, False if needs to continue
"""
memory_tracker_data_before_start = get_tracker_data(session)
should_fail = memory_tracker_data_before_start >= 2048
failed = False
try:
try_execute(
session,
f"FOREACH (i in range(1,10000) | CREATE (:Node {{prop:'big string or something like that'}}))",
)
except Exception as ex:
failed = True
output = str(ex)
log.info("Exception in create", output)
assert "Memory limit exceeded!" in output
if should_fail:
assert failed, "Query should have failed"
return False
return True
curr_repetition = 0
while curr_repetition < repetition_count:
log.info(f"Worker {worker_id} started iteration {curr_repetition}")
should_continue = create()
if not should_continue:
return True
time.sleep(sleep_sec)
log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}")
curr_repetition += 1
def execute_function(worker: Worker) -> Worker:
"""
Executes the function based on the worker type
"""
if worker.type == Constants.CREATE_FUNCTION:
run_writer(worker.repetition_count, worker.sleep_sec, worker.id)
log.info(f"Worker {worker.type} finished!")
return worker
raise Exception("Worker function not recognized, raising exception!")
@timed_function("total_execution_time")
def execution_handler() -> None:
clean_database()
log.info("Database is clean.")
setup_database_mode()
create_indices()
worker_count = args.worker_count
rep_count = args.repetition_count
workers = []
for i in range(worker_count):
workers.append(Worker(Constants.CREATE_FUNCTION, i, worker_count, rep_count, 0.1))
with multiprocessing.Pool(processes=worker_count) as p:
for worker in p.map(execute_function, workers):
log.info(f"Worker {worker.type} finished!")
if __name__ == "__main__":
logging.basicConfig(level=args.logging)
execution_handler()
if args.logging in ["DEBUG", "INFO"]:
output_data.dump()

View File

@ -0,0 +1,359 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
"""
Stress test for monitoring how memory tracker behaves when
there is lot of node creation and deletions compared
to RES memory usage.
"""
import atexit
import logging
import multiprocessing
import time
from argparse import Namespace as Args
from dataclasses import dataclass
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, Tuple
from common import (
OutputData,
SessionCache,
connection_argument_parser,
execute_till_success,
try_execute,
)
log = logging.getLogger(__name__)
output_data = OutputData()
class Constants:
CREATE_FUNCTION = "CREATE"
DELETE_FUNCTION = "DELETE"
MONITOR_CLEANUP_FUNCTION = "MONITOR_CLEANUP"
atexit.register(SessionCache.cleanup)
def parse_args() -> Args:
"""
Parses user arguments
:return: parsed arguments
"""
parser = connection_argument_parser()
parser.add_argument("--worker-count", type=int, default=5, help="Number of concurrent workers.")
parser.add_argument(
"--logging", default="INFO", choices=["INFO", "DEBUG", "WARNING", "ERROR"], help="Logging level"
)
parser.add_argument("--repetition-count", type=int, default=1000, help="Number of times to perform the action")
parser.add_argument("--isolation-level", type=str, required=True, help="Database isolation level.")
parser.add_argument("--storage-mode", type=str, required=True, help="Database storage mode.")
return parser.parse_args()
# Global variables
args = parse_args()
# Difference between memory RES and memory tracker on
# Memgraph start.
# Due to various other things which are included in RES
# there is difference of ~30MBs initially.
initial_diff = 0
@dataclass
class Worker:
"""
Class that performs a function defined in the `type` argument.
Args:
type - either `CREATE` or `DELETE`, signifying the function that's going to be performed
by the worker
id - worker id
total_worker_cnt - total number of workers for reference
repetition_count - number of times to perform the worker action
sleep_sec - float for subsecond sleeping between two subsequent actions
"""
type: str
id: int
total_worker_cnt: int
repetition_count: int
sleep_sec: float
def timed_function(name) -> Callable:
"""
Times performed function
"""
def actual_decorator(func) -> Callable:
@wraps(func)
def timed_wrapper(*args, **kwargs) -> Any:
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
output_data.add_measurement(name, end_time - start_time)
return result
return timed_wrapper
return actual_decorator
@timed_function("cleanup_time")
def clean_database() -> None:
session = SessionCache.argument_session(args)
execute_till_success(session, "MATCH (n) DETACH DELETE n")
def create_indices() -> None:
session = SessionCache.argument_session(args)
execute_till_success(session, "CREATE INDEX ON :Node")
def setup_database_mode() -> None:
session = SessionCache.argument_session(args)
execute_till_success(session, f"STORAGE MODE {args.storage_mode}")
execute_till_success(session, f"SET GLOBAL TRANSACTION ISOLATION LEVEL {args.isolation_level}")
def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int:
"""
This writer creates lot of nodes on each write.
"""
session = SessionCache.argument_session(args)
def create() -> bool:
exception_occured = False
memory_allocated_before, _ = get_storage_data(session)
count_before = execute_till_success(session, f"MATCH (n) RETURN COUNT(n) AS cnt")[0][0]["cnt"]
try:
try_execute(
session,
f"FOREACH (i in range(1,10000) | CREATE (:Node {{prop:'big string or something like that'}}))",
)
except Exception as ex:
log.info(f"Exception occured during create: {ex}")
exception_occured = True
memory_allocated_after, _ = get_storage_data(session)
count_after = execute_till_success(session, f"MATCH (n) RETURN COUNT(n) AS cnt")[0][0]["cnt"]
if exception_occured:
log.info(
f"Exception occured, stopping exection of run {Constants.CREATE_FUNCTION} worker."
f"Memory stats: before query: {memory_allocated_before}, after query: {memory_allocated_after}."
f"Node stats: before query {count_before}, after query {count_after}"
)
return False
return True
curr_repetition = 0
while curr_repetition < repetition_count:
log.info(f"Worker {worker_id} started iteration {curr_repetition}")
success = create()
assert success, "Create was not successfull"
time.sleep(sleep_sec)
log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}")
curr_repetition += 1
def run_deleter(repetition_count: int, sleep_sec: float) -> None:
"""
Deleting of whole graph
"""
session = SessionCache.argument_session(args)
def delete_graph() -> None:
try:
execute_till_success(session, f"MATCH (n) DETACH DELETE n")
log.info(f"Worker deleted all nodes")
except Exception as ex:
log.info(f"Worker failed to delete")
pass
curr_repetition = 0
while curr_repetition < repetition_count:
delete_graph()
time.sleep(sleep_sec)
curr_repetition += 1
def get_storage_data(session) -> Tuple[float, float]:
def parse_data(allocated: str) -> float:
num = 0
if "KiB" in allocated or "MiB" in allocated or "GiB" in allocated or "TiB" in allocated:
num = float(allocated[:-3])
else:
num = float(allocated[-1])
if "KiB" in allocated:
return num / 1024
if "MiB" in allocated:
return num
if "GiB" in allocated:
return num * 1024
else:
return num * 1024 * 1024
def isolate_value(data: List[Dict[str, Any]], key: str) -> Optional[str]:
for dict in data:
if dict["storage info"] == key:
return dict["value"]
return None
try:
data = execute_till_success(session, f"SHOW STORAGE INFO")[0]
res_data = isolate_value(data, "memory_usage")
memory_tracker_data = isolate_value(data, "memory_allocated")
log.info(
f"Worker {Constants.MONITOR_CLEANUP_FUNCTION} logged memory: memory tracker {memory_tracker_data} vs res data {res_data}"
)
return parse_data(memory_tracker_data), parse_data(res_data)
except Exception as ex:
log.info(f"Get storage info failed with error", ex)
raise Exception(f"Worker {Constants.MONITOR_CLEANUP_FUNCTION} can't continue working")
def run_monitor_cleanup(repetition_count: int, sleep_sec: float) -> None:
"""
Monitoring of graph and periodic cleanup.
Idea is that cleanup is in this function
so that we can make thread sleep for a while
and give RES vs memory tracker time to stabilize
to reduce flakeyness of test
"""
session = SessionCache.argument_session(args)
curr_repetition = 0
while curr_repetition < repetition_count:
memory_tracker, res_data = get_storage_data(session)
if memory_tracker < res_data and ((memory_tracker + initial_diff) < res_data):
# maybe RES got measured wrongly
# Problem with test using detach delete and memory tracker
# is that memory tracker gets updated immediately
# whereas RES takes some time
cnt_again = 3
skip_failure = False
# 10% is maximum increment, afterwards is fail
multiplier = 1
while cnt_again:
new_memory_tracker, new_res_data = get_storage_data(session)
if new_memory_tracker > new_res_data or (
(new_memory_tracker + initial_diff) * multiplier > new_res_data
):
skip_failure = True
log.info(
f"Skipping failure on new data:"
f"memory tracker: {new_memory_tracker}, initial diff: {initial_diff},"
f"RES data: {new_res_data}, multiplier: {multiplier}"
)
break
multiplier += 0.05
cnt_again -= 1
if not skip_failure:
log.info(memory_tracker, initial_diff, res_data)
assert False, "Memory tracker is off by more than 10%, check logs for details"
def run_cleanup():
try:
execute_till_success(session, f"FREE MEMORY")
log.info(f"Worker deleted all nodes")
except Exception as ex:
log.info(f"Worker failed to delete")
pass
# idea is to run cleanup from this thread and let thread sleep for a while so RES
# gets stabilized
if curr_repetition % 10 == 0:
run_cleanup()
time.sleep(sleep_sec)
curr_repetition += 1
def execute_function(worker: Worker) -> Worker:
"""
Executes the function based on the worker type
"""
if worker.type == Constants.CREATE_FUNCTION:
run_writer(worker.repetition_count, worker.sleep_sec, worker.id)
log.info(f"Worker {worker.type} finished!")
return worker
elif worker.type == Constants.DELETE_FUNCTION:
run_deleter(worker.repetition_count, worker.sleep_sec)
log.info(f"Worker {worker.type} finished!")
return worker
elif worker.type == Constants.MONITOR_CLEANUP_FUNCTION:
run_monitor_cleanup(worker.repetition_count, worker.sleep_sec)
log.info(f"Worker {worker.type} finished!")
return worker
raise Exception("Worker function not recognized, raising exception!")
@timed_function("total_execution_time")
def execution_handler() -> None:
clean_database()
log.info("Database is clean.")
setup_database_mode()
create_indices()
worker_count = args.worker_count
rep_count = args.repetition_count
workers = []
for i in range(worker_count - 2):
workers.append(Worker(Constants.CREATE_FUNCTION, i, worker_count - 2, rep_count, 0.1))
workers.append(Worker(Constants.DELETE_FUNCTION, -1, 1, rep_count * 1.5, 0.1))
workers.append(Worker(Constants.MONITOR_CLEANUP_FUNCTION, -1, 1, rep_count * 1.5, 0.1))
with multiprocessing.Pool(processes=worker_count) as p:
for worker in p.map(execute_function, workers):
log.info(f"Worker {worker.type} finished!")
if __name__ == "__main__":
logging.basicConfig(level=args.logging)
session = SessionCache.argument_session(args)
memory_tracker_size, memory_res = get_storage_data(session)
# global var used in monitoring
initial_diff = memory_res - memory_tracker_size
execution_handler()
if args.logging in ["DEBUG", "INFO"]:
output_data.dump()

View File

@ -12,6 +12,7 @@ class DatasetConstants:
OPTIONS = "options"
TIMEOUT = "timeout"
MODE = "mode"
MEMGRAPH_OPTIONS = "memgraph_options"
@dataclass
@ -68,6 +69,20 @@ SMALL_DATASET = [
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "memory_tracker.py",
DatasetConstants.OPTIONS: ["--worker-count", "5", "--repetition-count", "100"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
DatasetConstants.MEMGRAPH_OPTIONS: ["--memory-limit=2048"],
},
{
DatasetConstants.TEST: "memory_limit.py",
DatasetConstants.OPTIONS: ["--worker-count", "5", "--repetition-count", "100"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
DatasetConstants.MEMGRAPH_OPTIONS: ["--memory-limit=2048"],
},
{
DatasetConstants.TEST: "create_match.py",
DatasetConstants.OPTIONS: ["--vertex-count", "40000", "--create-pack-size", "100"],