This commit is contained in:
antoniofilipovic 2024-01-19 14:57:41 +01:00
parent c15b62a88d
commit 5385c741ad
3 changed files with 22 additions and 4 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -60,10 +60,10 @@ void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t
unsigned arena_ind) {
// This needs to be before, to throw exception in case of too big alloc
if (*commit) [[likely]] {
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size));
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
GetQueriesMemoryControl().TrackAllocOnCurrentThread(size);
}
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size));
}
auto *ptr = old_hooks->alloc(extent_hooks, new_addr, size, alignment, zero, commit, arena_ind);

View File

@ -1646,6 +1646,7 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *strea
MG_ASSERT(transaction_id.has_value());
if (memory_limit_) {
std::cout << "Memory tracking transaction id: " << *transaction_id << std::endl;
memgraph::memory::TryStartTrackingOnTransaction(*transaction_id, *memory_limit_);
memgraph::memory::StartTrackingCurrentThreadTransaction(*transaction_id);
}
@ -4172,6 +4173,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
utils::Downcast<ConstraintQuery>(parsed_query.query) != nullptr ||
upper_case_query.find(kSchemaAssert) != std::string::npos;
SetupDatabaseTransaction(could_commit, unique);
query_execution_ptr->get()->transaction_id_ = *current_db_.db_transactional_accessor_->GetTransactionId();
}
#ifdef MG_ENTERPRISE

View File

@ -312,6 +312,7 @@ class Interpreter final {
std::map<std::string, TypedValue> summary;
std::vector<Notification> notifications;
uint64_t transaction_id_;
static auto Create(std::variant<utils::MonotonicBufferResource, utils::PoolResource> memory_resource,
std::optional<PreparedQuery> prepared_query = std::nullopt) -> std::unique_ptr<QueryExecution> {
@ -407,7 +408,10 @@ std::map<std::string, TypedValue> Interpreter::Pull(TStream *result_stream, std:
auto &query_execution = query_executions_[qid_value];
MG_ASSERT(query_execution && query_execution->prepared_query, "Query already finished executing!");
if (query_execution->transaction_id_ != *current_transaction_) {
std::cout << "Transaction id not same:" << query_execution->transaction_id_ << " != " << *current_transaction_
<< std::endl;
}
// Each prepared query has its own summary so we need to somehow preserve
// it after it finishes executing because it gets destroyed alongside
// the prepared query and its execution memory.
@ -427,8 +431,13 @@ std::map<std::string, TypedValue> Interpreter::Pull(TStream *result_stream, std:
// If the query finished executing, we have received a value which tells
// us what to do after.
if (maybe_res) {
if (query_execution->transaction_id_) {
std::cout << "removing memory tracking on id: " << query_execution->transaction_id_ << std::endl;
memgraph::memory::TryStopTrackingOnTransaction(query_execution->transaction_id_);
}
if (current_transaction_) {
memgraph::memory::TryStopTrackingOnTransaction(*current_transaction_);
std::cout << "[BEFORE] removing memory tracking on id: " << *current_transaction_ << std::endl;
// memgraph::memory::TryStopTrackingOnTransaction(query_execution->transaction_id_);
}
// Save its summary
maybe_summary.emplace(std::move(query_execution->summary));
@ -444,9 +453,13 @@ std::map<std::string, TypedValue> Interpreter::Pull(TStream *result_stream, std:
switch (*maybe_res) {
case QueryHandlerResult::COMMIT:
Commit();
break;
case QueryHandlerResult::ABORT:
Abort();
// if (current_transaction_) {
// memgraph::memory::TryStopTrackingOnTransaction(query_execution->transaction_id_);
// }
break;
case QueryHandlerResult::NOTHING:
// The only cases in which we have nothing to do are those where
@ -464,6 +477,9 @@ std::map<std::string, TypedValue> Interpreter::Pull(TStream *result_stream, std:
// We can only clear this execution as some of the queries
// in the transaction can be in unfinished state
query_execution.reset(nullptr);
// if (current_transaction_) {
// memgraph::memory::TryStopTrackingOnTransaction(*current_transaction_);
// }
}
}
} catch (const ExplicitTransactionUsageException &) {