Fix memory tracking issues (#150)

This commit is contained in:
antonio2368 2021-05-18 18:42:19 +02:00 committed by GitHub
parent 30413a7b4f
commit 999b3ef79f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 163 additions and 47 deletions

View File

@ -6,6 +6,8 @@
* Fixed parsing of types for Python procedures for types nested in `mgp.List`.
For example, parsing of `mgp.List[mgp.Map]` works now.
* Fixed memory tracking issues. Some of the allocation and deallocation weren't
tracked during the query execution.
* Fixed reading CSV files that are using CRLF as the newline symbol.
## v1.4.0

View File

@ -230,5 +230,5 @@ pushd jemalloc
# avoid spurious latencies and additional work associated with
# MADV_DONTNEED. See
# https://github.com/ClickHouse/ClickHouse/issues/11121 for motivation.
./autogen.sh --with-malloc-conf="percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:10000"
./autogen.sh --with-malloc-conf="percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000"
popd

View File

@ -604,8 +604,8 @@ struct PullPlanVector {
struct PullPlan {
explicit PullPlan(std::shared_ptr<CachedPlan> plan, const Parameters &parameters, bool is_profile_query,
DbAccessor *dba, InterpreterContext *interpreter_context,
utils::MonotonicBufferResource *execution_memory, std::optional<size_t> memory_limit = {});
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
std::optional<size_t> memory_limit = {});
std::optional<ExecutionContext> Pull(AnyStream *stream, std::optional<int> n,
const std::vector<Symbol> &output_symbols,
std::map<std::string, TypedValue> *summary);
@ -632,8 +632,8 @@ struct PullPlan {
};
PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &parameters, const bool is_profile_query,
DbAccessor *dba, InterpreterContext *interpreter_context,
utils::MonotonicBufferResource *execution_memory, const std::optional<size_t> memory_limit)
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
const std::optional<size_t> memory_limit)
: plan_(plan),
cursor_(plan->plan().MakeCursor(execution_memory)),
frame_(plan->symbol_table().max_position(), execution_memory),
@ -835,7 +835,7 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
InterpreterContext *interpreter_context, DbAccessor *dba,
utils::MonotonicBufferResource *execution_memory) {
utils::MemoryResource *execution_memory) {
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query);
Frame frame(0);
@ -886,7 +886,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
InterpreterContext *interpreter_context, DbAccessor *dba,
utils::MonotonicBufferResource *execution_memory) {
utils::MemoryResource *execution_memory) {
const std::string kExplainQueryStart = "explain ";
MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kExplainQueryStart),
"Expected stripped query to start with '{}'", kExplainQueryStart);
@ -932,7 +932,7 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map<std::string
PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
DbAccessor *dba, utils::MonotonicBufferResource *execution_memory) {
DbAccessor *dba, utils::MemoryResource *execution_memory) {
const std::string kProfileQueryStart = "profile ";
MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kProfileQueryStart),
@ -1015,7 +1015,7 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
}
PreparedQuery PrepareDumpQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary, DbAccessor *dba,
utils::MonotonicBufferResource *execution_memory) {
utils::MemoryResource *execution_memory) {
return PreparedQuery{{"QUERY"},
std::move(parsed_query.required_privileges),
[pull_plan = std::make_shared<PullPlanDump>(dba)](
@ -1030,7 +1030,7 @@ PreparedQuery PrepareDumpQuery(ParsedQuery parsed_query, std::map<std::string, T
PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
utils::MonotonicBufferResource *execution_memory) {
utils::MemoryResource *execution_memory) {
if (in_explicit_transaction) {
throw IndexInMulticommandTxException();
}
@ -1099,7 +1099,7 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
DbAccessor *dba, utils::MonotonicBufferResource *execution_memory) {
DbAccessor *dba, utils::MemoryResource *execution_memory) {
if (in_explicit_transaction) {
throw UserModificationInMulticommandTxException();
}
@ -1212,7 +1212,7 @@ PreparedQuery PrepareFreeMemoryQuery(ParsedQuery parsed_query, const bool in_exp
PreparedQuery PrepareInfoQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
storage::Storage *db, utils::MonotonicBufferResource *execution_memory) {
storage::Storage *db, utils::MemoryResource *execution_memory) {
if (in_explicit_transaction) {
throw InfoInMulticommandTxException();
}
@ -1300,8 +1300,7 @@ PreparedQuery PrepareInfoQuery(ParsedQuery parsed_query, bool in_explicit_transa
PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
std::map<std::string, TypedValue> *summary,
InterpreterContext *interpreter_context,
utils::MonotonicBufferResource *execution_memory) {
InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory) {
if (in_explicit_transaction) {
throw ConstraintInMulticommandTxException();
}

View File

@ -317,7 +317,9 @@ class Interpreter final {
private:
struct QueryExecution {
std::optional<PreparedQuery> prepared_query;
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
utils::MonotonicBufferResource execution_monotonic_memory{kExecutionMemoryBlockSize};
utils::ResourceWithOutOfMemoryException execution_memory{&execution_monotonic_memory};
std::map<std::string, TypedValue> summary;
explicit QueryExecution() = default;
@ -331,7 +333,7 @@ class Interpreter final {
// destroy the prepared query which is using that instance
// of execution memory.
prepared_query.reset();
execution_memory.Release();
execution_monotonic_memory.Release();
}
};

View File

@ -1,3 +1,4 @@
#include <cstddef>
#include <new>
#if USE_JEMALLOC
@ -10,7 +11,7 @@
#include "utils/memory_tracker.hpp"
namespace {
void *newImpl(std::size_t size) {
void *newImpl(const std::size_t size) {
auto *ptr = malloc(size);
if (LIKELY(ptr != nullptr)) {
return ptr;
@ -19,11 +20,26 @@ void *newImpl(std::size_t size) {
throw std::bad_alloc{};
}
void *newNoExcept(const std::size_t size) noexcept { return malloc(size); }
void *newImpl(const std::size_t size, const std::align_val_t align) {
auto *ptr = aligned_alloc(static_cast<std::size_t>(align), size);
if (LIKELY(ptr != nullptr)) {
return ptr;
}
void deleteImpl(void *ptr) noexcept { free(ptr); }
throw std::bad_alloc{};
}
void *newNoExcept(const std::size_t size) noexcept { return malloc(size); }
void *newNoExcept(const std::size_t size, const std::align_val_t align) noexcept {
return aligned_alloc(size, static_cast<std::size_t>(align));
}
#if USE_JEMALLOC
void deleteImpl(void *ptr) noexcept { dallocx(ptr, 0); }
void deleteImpl(void *ptr, const std::align_val_t align) noexcept {
dallocx(ptr, MALLOCX_ALIGN(align)); // NOLINT(hicpp-signed-bitwise)
}
void deleteSized(void *ptr, const std::size_t size) noexcept {
if (UNLIKELY(ptr == nullptr)) {
@ -33,24 +49,43 @@ void deleteSized(void *ptr, const std::size_t size) noexcept {
sdallocx(ptr, size, 0);
}
void deleteSized(void *ptr, const std::size_t size, const std::align_val_t align) noexcept {
if (UNLIKELY(ptr == nullptr)) {
return;
}
sdallocx(ptr, size, MALLOCX_ALIGN(align)); // NOLINT(hicpp-signed-bitwise)
}
#else
void deleteImpl(void *ptr) noexcept { free(ptr); }
void deleteImpl(void *ptr, const std::align_val_t /*unused*/) noexcept { free(ptr); }
void deleteSized(void *ptr, const std::size_t /*unused*/) noexcept { free(ptr); }
void deleteSized(void *ptr, const std::size_t /*unused*/, const std::align_val_t /*unused*/) noexcept { free(ptr); }
#endif
void TrackMemory(const size_t size) {
size_t actual_size = size;
void TrackMemory(std::size_t size) {
#if USE_JEMALLOC
if (LIKELY(size != 0)) {
actual_size = nallocx(size, 0);
size = nallocx(size, 0);
}
#endif
utils::total_memory_tracker.Alloc(actual_size);
utils::total_memory_tracker.Alloc(size);
}
bool TrackMemoryNoExcept(const size_t size) {
void TrackMemory(std::size_t size, const std::align_val_t align) {
#if USE_JEMALLOC
if (LIKELY(size != 0)) {
size = nallocx(size, MALLOCX_ALIGN(align)); // NOLINT(hicpp-signed-bitwise)
}
#endif
utils::total_memory_tracker.Alloc(size);
}
bool TrackMemoryNoExcept(const std::size_t size) {
try {
TrackMemory(size);
} catch (...) {
@ -60,7 +95,17 @@ bool TrackMemoryNoExcept(const size_t size) {
return true;
}
void UntrackMemory([[maybe_unused]] void *ptr, [[maybe_unused]] size_t size = 0) noexcept {
bool TrackMemoryNoExcept(const std::size_t size, const std::align_val_t align) {
try {
TrackMemory(size, align);
} catch (...) {
return false;
}
return true;
}
void UntrackMemory([[maybe_unused]] void *ptr, [[maybe_unused]] std::size_t size = 0) noexcept {
try {
#if USE_JEMALLOC
if (LIKELY(ptr != nullptr)) {
@ -78,32 +123,74 @@ void UntrackMemory([[maybe_unused]] void *ptr, [[maybe_unused]] size_t size = 0)
}
}
void UntrackMemory(void *ptr, const std::align_val_t align, [[maybe_unused]] std::size_t size = 0) noexcept {
try {
#if USE_JEMALLOC
if (LIKELY(ptr != nullptr)) {
utils::total_memory_tracker.Free(sallocx(ptr, MALLOCX_ALIGN(align))); // NOLINT(hicpp-signed-bitwise)
}
#else
if (size) {
utils::total_memory_tracker.Free(size);
} else {
// Innaccurate because malloc_usable_size() result is greater or equal to allocated size.
utils::total_memory_tracker.Free(malloc_usable_size(ptr));
}
#endif
} catch (...) {
}
}
} // namespace
void *operator new(std::size_t size) {
void *operator new(const std::size_t size) {
TrackMemory(size);
return newImpl(size);
}
void *operator new[](std::size_t size) {
void *operator new[](const std::size_t size) {
TrackMemory(size);
return newImpl(size);
}
void *operator new(std::size_t size, const std::nothrow_t & /*unused*/) noexcept {
void *operator new(const std::size_t size, const std::align_val_t align) {
TrackMemory(size, align);
return newImpl(size, align);
}
void *operator new[](const std::size_t size, const std::align_val_t align) {
TrackMemory(size, align);
return newImpl(size, align);
}
void *operator new(const std::size_t size, const std::nothrow_t & /*unused*/) noexcept {
if (LIKELY(TrackMemoryNoExcept(size))) {
return newNoExcept(size);
}
return nullptr;
}
void *operator new[](std::size_t size, const std::nothrow_t & /*unused*/) noexcept {
void *operator new[](const std::size_t size, const std::nothrow_t & /*unused*/) noexcept {
if (LIKELY(TrackMemoryNoExcept(size))) {
return newNoExcept(size);
}
return nullptr;
}
void *operator new(const std::size_t size, const std::align_val_t align, const std::nothrow_t & /*unused*/) noexcept {
if (LIKELY(TrackMemoryNoExcept(size, align))) {
return newNoExcept(size, align);
}
return nullptr;
}
void *operator new[](const std::size_t size, const std::align_val_t align, const std::nothrow_t & /*unused*/) noexcept {
if (LIKELY(TrackMemoryNoExcept(size, align))) {
return newNoExcept(size, align);
}
return nullptr;
}
void operator delete(void *ptr) noexcept {
UntrackMemory(ptr);
deleteImpl(ptr);
@ -114,12 +201,52 @@ void operator delete[](void *ptr) noexcept {
deleteImpl(ptr);
}
void operator delete(void *ptr, std::size_t size) noexcept {
void operator delete(void *ptr, const std::align_val_t align) noexcept {
UntrackMemory(ptr, align);
deleteImpl(ptr, align);
}
void operator delete[](void *ptr, const std::align_val_t align) noexcept {
UntrackMemory(ptr, align);
deleteImpl(ptr, align);
}
void operator delete(void *ptr, const std::size_t size) noexcept {
UntrackMemory(ptr, size);
deleteSized(ptr, size);
}
void operator delete[](void *ptr, std::size_t size) noexcept {
void operator delete[](void *ptr, const std::size_t size) noexcept {
UntrackMemory(ptr, size);
deleteSized(ptr, size);
}
void operator delete(void *ptr, const std::size_t size, const std::align_val_t align) noexcept {
UntrackMemory(ptr, align, size);
deleteSized(ptr, size, align);
}
void operator delete[](void *ptr, const std::size_t size, const std::align_val_t align) noexcept {
UntrackMemory(ptr, align, size);
deleteSized(ptr, size, align);
}
void operator delete(void *ptr, const std::nothrow_t & /*unused*/) noexcept {
UntrackMemory(ptr);
deleteImpl(ptr);
}
void operator delete[](void *ptr, const std::nothrow_t & /*unused*/) noexcept {
UntrackMemory(ptr);
deleteImpl(ptr);
}
void operator delete(void *ptr, const std::align_val_t align, const std::nothrow_t & /*unused*/) noexcept {
UntrackMemory(ptr, align);
deleteImpl(ptr, align);
}
void operator delete[](void *ptr, const std::align_val_t align, const std::nothrow_t & /*unused*/) noexcept {
UntrackMemory(ptr, align);
deleteImpl(ptr, align);
}

View File

@ -38,19 +38,5 @@ int main(int argc, char **argv) {
}
spdlog::info("Memgraph is out of memory");
spdlog::info("Cleaning up unused memory");
client->Execute("MATCH (n) DETACH DELETE n;");
client->DiscardAll();
client->Execute("FREE MEMORY;");
client->DiscardAll();
// now it should succeed
spdlog::info("Retrying the query with the memory cleaned up");
client->Execute(create_query);
if (!client->FetchOne()) {
LOG_FATAL("Memgraph is still out of memory");
}
return 0;
}

View File

@ -2,7 +2,7 @@ bolt_port: &bolt_port "7687"
template_cluster: &template_cluster
cluster:
main:
args: ["--bolt-port", *bolt_port, "--memory-limit=500", "--storage-gc-cycle-sec=180", "--log-level=TRACE"]
args: ["--bolt-port", *bolt_port, "--memory-limit=1000", "--storage-gc-cycle-sec=180", "--log-level=TRACE"]
log_file: "memory-e2e.log"
setup_queries: []
validation_queries: []