Merge branch 'master' into Implement-constant-time-label-and-edge-type-retrieval
This commit is contained in:
commit
2946d74fdd
@ -24,6 +24,7 @@ namespace memgraph::dbms {
|
||||
Database::Database(storage::Config config, const replication::ReplicationState &repl_state)
|
||||
: trigger_store_(config.durability.storage_directory / "triggers"),
|
||||
streams_{config.durability.storage_directory / "streams"},
|
||||
plan_cache_{FLAGS_query_plan_cache_max_size},
|
||||
repl_state_(&repl_state) {
|
||||
if (config.storage_mode == memgraph::storage::StorageMode::ON_DISK_TRANSACTIONAL || config.force_on_disk ||
|
||||
utils::DirExists(config.disk.main_storage_directory)) {
|
||||
|
@ -24,6 +24,8 @@
|
||||
#include "query/trigger.hpp"
|
||||
#include "storage/v2/storage.hpp"
|
||||
#include "utils/gatekeeper.hpp"
|
||||
#include "utils/lru_cache.hpp"
|
||||
#include "utils/synchronized.hpp"
|
||||
|
||||
namespace memgraph::dbms {
|
||||
|
||||
@ -146,9 +148,9 @@ class Database {
|
||||
/**
|
||||
* @brief Returns the PlanCache vector raw pointer
|
||||
*
|
||||
* @return utils::SkipList<query::PlanCacheEntry>*
|
||||
* @return utils::Synchronized<utils::LRUCache<uint64_t, std::shared_ptr<PlanWrapper>>, utils::RWSpinLock>
|
||||
*/
|
||||
utils::SkipList<query::PlanCacheEntry> *plan_cache() { return &plan_cache_; }
|
||||
query::PlanCacheLRU *plan_cache() { return &plan_cache_; }
|
||||
|
||||
private:
|
||||
std::unique_ptr<storage::Storage> storage_; //!< Underlying storage
|
||||
@ -157,7 +159,7 @@ class Database {
|
||||
query::stream::Streams streams_; //!< Streams associated with the storage
|
||||
|
||||
// TODO: Move to a better place
|
||||
utils::SkipList<query::PlanCacheEntry> plan_cache_; //!< Plan cache associated with the storage
|
||||
query::PlanCacheLRU plan_cache_; //!< Plan cache associated with the storage
|
||||
|
||||
const replication::ReplicationState *repl_state_;
|
||||
};
|
||||
|
@ -62,10 +62,7 @@ void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t
|
||||
if (*commit) [[likely]] {
|
||||
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size));
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread();
|
||||
if (memory_tracker != nullptr) [[likely]] {
|
||||
memory_tracker->Alloc(static_cast<int64_t>(size));
|
||||
}
|
||||
GetQueriesMemoryControl().TrackAllocOnCurrentThread(size);
|
||||
}
|
||||
}
|
||||
|
||||
@ -74,10 +71,7 @@ void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t
|
||||
if (*commit) {
|
||||
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread();
|
||||
if (memory_tracker != nullptr) [[likely]] {
|
||||
memory_tracker->Free(static_cast<int64_t>(size));
|
||||
}
|
||||
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
|
||||
}
|
||||
}
|
||||
return ptr;
|
||||
@ -97,10 +91,7 @@ static bool my_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size, boo
|
||||
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
|
||||
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread();
|
||||
if (memory_tracker != nullptr) [[likely]] {
|
||||
memory_tracker->Free(static_cast<int64_t>(size));
|
||||
}
|
||||
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
|
||||
}
|
||||
}
|
||||
|
||||
@ -111,10 +102,7 @@ static void my_destroy(extent_hooks_t *extent_hooks, void *addr, size_t size, bo
|
||||
if (committed) [[likely]] {
|
||||
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread();
|
||||
if (memory_tracker != nullptr) [[likely]] {
|
||||
memory_tracker->Free(static_cast<int64_t>(size));
|
||||
}
|
||||
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
|
||||
}
|
||||
}
|
||||
|
||||
@ -131,10 +119,7 @@ static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, siz
|
||||
|
||||
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(length));
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread();
|
||||
if (memory_tracker != nullptr) [[likely]] {
|
||||
memory_tracker->Alloc(static_cast<int64_t>(size));
|
||||
}
|
||||
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
|
||||
}
|
||||
|
||||
return false;
|
||||
@ -151,10 +136,7 @@ static bool my_decommit(extent_hooks_t *extent_hooks, void *addr, size_t size, s
|
||||
|
||||
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(length));
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread();
|
||||
if (memory_tracker != nullptr) [[likely]] {
|
||||
memory_tracker->Free(static_cast<int64_t>(size));
|
||||
}
|
||||
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
|
||||
}
|
||||
|
||||
return false;
|
||||
@ -171,10 +153,7 @@ static bool my_purge_forced(extent_hooks_t *extent_hooks, void *addr, size_t siz
|
||||
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(length));
|
||||
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread();
|
||||
if (memory_tracker != nullptr) [[likely]] {
|
||||
memory_tracker->Alloc(static_cast<int64_t>(size));
|
||||
}
|
||||
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
|
||||
}
|
||||
|
||||
return false;
|
||||
|
@ -10,6 +10,7 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
#include <cstdint>
|
||||
#include <iostream>
|
||||
#include <optional>
|
||||
@ -58,34 +59,58 @@ void QueriesMemoryControl::EraseThreadToTransactionId(const std::thread::id &thr
|
||||
accessor.remove(thread_id);
|
||||
}
|
||||
|
||||
utils::MemoryTracker *QueriesMemoryControl::GetTrackerCurrentThread() {
|
||||
void QueriesMemoryControl::TrackAllocOnCurrentThread(size_t size) {
|
||||
auto thread_id_to_transaction_id_accessor = thread_id_to_transaction_id.access();
|
||||
|
||||
// we might be just constructing mapping between thread id and transaction id
|
||||
// so we miss this allocation
|
||||
auto thread_id_to_transaction_id_elem = thread_id_to_transaction_id_accessor.find(std::this_thread::get_id());
|
||||
if (thread_id_to_transaction_id_elem == thread_id_to_transaction_id_accessor.end()) {
|
||||
return nullptr;
|
||||
return;
|
||||
}
|
||||
|
||||
auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access();
|
||||
auto transaction_id_to_tracker =
|
||||
transaction_id_to_tracker_accessor.find(thread_id_to_transaction_id_elem->transaction_id);
|
||||
return &transaction_id_to_tracker->tracker;
|
||||
|
||||
// It can happen that some allocation happens between mapping thread to
|
||||
// transaction id, so we miss this allocation
|
||||
if (transaction_id_to_tracker == transaction_id_to_tracker_accessor.end()) [[unlikely]] {
|
||||
return;
|
||||
}
|
||||
auto &query_tracker = transaction_id_to_tracker->tracker;
|
||||
query_tracker.TrackAlloc(size);
|
||||
}
|
||||
|
||||
void QueriesMemoryControl::TrackFreeOnCurrentThread(size_t size) {
|
||||
auto thread_id_to_transaction_id_accessor = thread_id_to_transaction_id.access();
|
||||
|
||||
// we might be just constructing mapping between thread id and transaction id
|
||||
// so we miss this allocation
|
||||
auto thread_id_to_transaction_id_elem = thread_id_to_transaction_id_accessor.find(std::this_thread::get_id());
|
||||
if (thread_id_to_transaction_id_elem == thread_id_to_transaction_id_accessor.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access();
|
||||
auto transaction_id_to_tracker =
|
||||
transaction_id_to_tracker_accessor.find(thread_id_to_transaction_id_elem->transaction_id);
|
||||
|
||||
// It can happen that some allocation happens between mapping thread to
|
||||
// transaction id, so we miss this allocation
|
||||
if (transaction_id_to_tracker == transaction_id_to_tracker_accessor.end()) [[unlikely]] {
|
||||
return;
|
||||
}
|
||||
auto &query_tracker = transaction_id_to_tracker->tracker;
|
||||
query_tracker.TrackFree(size);
|
||||
}
|
||||
|
||||
void QueriesMemoryControl::CreateTransactionIdTracker(uint64_t transaction_id, size_t inital_limit) {
|
||||
auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access();
|
||||
|
||||
auto [elem, result] = transaction_id_to_tracker_accessor.insert({transaction_id, utils::MemoryTracker{}});
|
||||
auto [elem, result] = transaction_id_to_tracker_accessor.insert({transaction_id, utils::QueryMemoryTracker{}});
|
||||
|
||||
elem->tracker.SetMaximumHardLimit(inital_limit);
|
||||
elem->tracker.SetHardLimit(inital_limit);
|
||||
}
|
||||
|
||||
bool QueriesMemoryControl::CheckTransactionIdTrackerExists(uint64_t transaction_id) {
|
||||
auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access();
|
||||
return transaction_id_to_tracker_accessor.contains(transaction_id);
|
||||
elem->tracker.SetQueryLimit(inital_limit);
|
||||
}
|
||||
|
||||
bool QueriesMemoryControl::EraseTransactionIdTracker(uint64_t transaction_id) {
|
||||
@ -102,6 +127,45 @@ void QueriesMemoryControl::InitializeArenaCounter(unsigned arena_ind) {
|
||||
arena_tracking[arena_ind].store(0, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
bool QueriesMemoryControl::CheckTransactionIdTrackerExists(uint64_t transaction_id) {
|
||||
auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access();
|
||||
return transaction_id_to_tracker_accessor.contains(transaction_id);
|
||||
}
|
||||
|
||||
void QueriesMemoryControl::TryCreateTransactionProcTracker(uint64_t transaction_id, int64_t procedure_id,
|
||||
size_t limit) {
|
||||
auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access();
|
||||
auto query_tracker = transaction_id_to_tracker_accessor.find(transaction_id);
|
||||
|
||||
if (query_tracker == transaction_id_to_tracker_accessor.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
query_tracker->tracker.TryCreateProcTracker(procedure_id, limit);
|
||||
}
|
||||
|
||||
void QueriesMemoryControl::SetActiveProcIdTracker(uint64_t transaction_id, int64_t procedure_id) {
|
||||
auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access();
|
||||
auto query_tracker = transaction_id_to_tracker_accessor.find(transaction_id);
|
||||
|
||||
if (query_tracker == transaction_id_to_tracker_accessor.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
query_tracker->tracker.SetActiveProc(procedure_id);
|
||||
}
|
||||
|
||||
void QueriesMemoryControl::PauseProcedureTracking(uint64_t transaction_id) {
|
||||
auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access();
|
||||
auto query_tracker = transaction_id_to_tracker_accessor.find(transaction_id);
|
||||
|
||||
if (query_tracker == transaction_id_to_tracker_accessor.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
query_tracker->tracker.StopProcTracking();
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
void StartTrackingCurrentThreadTransaction(uint64_t transaction_id) {
|
||||
@ -137,4 +201,29 @@ void TryStopTrackingOnTransaction(uint64_t transaction_id) {
|
||||
#endif
|
||||
}
|
||||
|
||||
#if USE_JEMALLOC
|
||||
bool IsTransactionTracked(uint64_t transaction_id) {
|
||||
return GetQueriesMemoryControl().CheckTransactionIdTrackerExists(transaction_id);
|
||||
}
|
||||
#else
|
||||
bool IsTransactionTracked(uint64_t /*transaction_id*/) { return false; }
|
||||
#endif
|
||||
|
||||
void CreateOrContinueProcedureTracking(uint64_t transaction_id, int64_t procedure_id, size_t limit) {
|
||||
#if USE_JEMALLOC
|
||||
if (!GetQueriesMemoryControl().CheckTransactionIdTrackerExists(transaction_id)) {
|
||||
LOG_FATAL("Memory tracker for transaction was not set");
|
||||
}
|
||||
|
||||
GetQueriesMemoryControl().TryCreateTransactionProcTracker(transaction_id, procedure_id, limit);
|
||||
GetQueriesMemoryControl().SetActiveProcIdTracker(transaction_id, procedure_id);
|
||||
#endif
|
||||
}
|
||||
|
||||
void PauseProcedureTracking(uint64_t transaction_id) {
|
||||
#if USE_JEMALLOC
|
||||
GetQueriesMemoryControl().PauseProcedureTracking(transaction_id);
|
||||
#endif
|
||||
}
|
||||
|
||||
} // namespace memgraph::memory
|
||||
|
@ -16,10 +16,13 @@
|
||||
#include <unordered_map>
|
||||
|
||||
#include "utils/memory_tracker.hpp"
|
||||
#include "utils/query_memory_tracker.hpp"
|
||||
#include "utils/skip_list.hpp"
|
||||
|
||||
namespace memgraph::memory {
|
||||
|
||||
static constexpr int64_t UNLIMITED_MEMORY{0};
|
||||
|
||||
#if USE_JEMALLOC
|
||||
|
||||
// Track memory allocations per query.
|
||||
@ -75,16 +78,21 @@ class QueriesMemoryControl {
|
||||
// Important to reset if one thread gets reused for different transaction
|
||||
void EraseThreadToTransactionId(const std::thread::id &, uint64_t);
|
||||
|
||||
// C-API functionality for thread to transaction mapping
|
||||
void UpdateThreadToTransactionId(const char *, uint64_t);
|
||||
// Find tracker for current thread if exists, track
|
||||
// query allocation and procedure allocation if
|
||||
// necessary
|
||||
void TrackAllocOnCurrentThread(size_t size);
|
||||
|
||||
// C-API functionality for thread to transaction unmapping
|
||||
void EraseThreadToTransactionId(const char *, uint64_t);
|
||||
// Find tracker for current thread if exists, track
|
||||
// query allocation and procedure allocation if
|
||||
// necessary
|
||||
void TrackFreeOnCurrentThread(size_t size);
|
||||
|
||||
// Get tracker to current thread if exists, otherwise return
|
||||
// nullptr. This can happen only if tracker is still
|
||||
// being constructed.
|
||||
utils::MemoryTracker *GetTrackerCurrentThread();
|
||||
void TryCreateTransactionProcTracker(uint64_t, int64_t, size_t);
|
||||
|
||||
void SetActiveProcIdTracker(uint64_t, int64_t);
|
||||
|
||||
void PauseProcedureTracking(uint64_t);
|
||||
|
||||
private:
|
||||
std::unordered_map<unsigned, std::atomic<int>> arena_tracking;
|
||||
@ -102,7 +110,7 @@ class QueriesMemoryControl {
|
||||
|
||||
struct TransactionIdToTracker {
|
||||
uint64_t transaction_id;
|
||||
utils::MemoryTracker tracker;
|
||||
utils::QueryMemoryTracker tracker;
|
||||
|
||||
bool operator<(const TransactionIdToTracker &other) const { return transaction_id < other.transaction_id; }
|
||||
bool operator==(const TransactionIdToTracker &other) const { return transaction_id == other.transaction_id; }
|
||||
@ -138,4 +146,15 @@ void TryStartTrackingOnTransaction(uint64_t transaction_id, size_t limit);
|
||||
// Does nothing if jemalloc is not enabled. Does nothing if tracker doesn't exist
|
||||
void TryStopTrackingOnTransaction(uint64_t transaction_id);
|
||||
|
||||
// Is transaction with given id tracked in memory tracker
|
||||
bool IsTransactionTracked(uint64_t transaction_id);
|
||||
|
||||
// Creates tracker on procedure if doesn't exist. Sets query tracker
|
||||
// to track procedure with id.
|
||||
void CreateOrContinueProcedureTracking(uint64_t transaction_id, int64_t procedure_id, size_t limit);
|
||||
|
||||
// Pauses procedure tracking. This enables to continue
|
||||
// tracking on procedure once procedure execution resumes.
|
||||
void PauseProcedureTracking(uint64_t transaction_id);
|
||||
|
||||
} // namespace memgraph::memory
|
||||
|
@ -12,15 +12,16 @@
|
||||
#include "query/cypher_query_interpreter.hpp"
|
||||
#include "query/frontend/ast/cypher_main_visitor.hpp"
|
||||
#include "query/frontend/opencypher/parser.hpp"
|
||||
#include "utils/synchronized.hpp"
|
||||
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_bool(query_cost_planner, true, "Use the cost-estimating query planner.");
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60, "Time to live for cached query plans, in seconds.",
|
||||
DEFINE_VALIDATED_int32(query_plan_cache_max_size, 1000, "Maximum number of query plans to cache.",
|
||||
FLAG_IN_RANGE(0, std::numeric_limits<int32_t>::max()));
|
||||
|
||||
namespace memgraph::query {
|
||||
CachedPlan::CachedPlan(std::unique_ptr<LogicalPlan> plan) : plan_(std::move(plan)) {}
|
||||
PlanWrapper::PlanWrapper(std::unique_ptr<LogicalPlan> plan) : plan_(std::move(plan)) {}
|
||||
|
||||
ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::string, storage::PropertyValue> ¶ms,
|
||||
utils::SkipList<QueryCacheEntry> *cache, const InterpreterConfig::Query &query_config) {
|
||||
@ -127,28 +128,24 @@ std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery
|
||||
std::move(symbol_table));
|
||||
}
|
||||
|
||||
std::shared_ptr<CachedPlan> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
|
||||
const Parameters ¶meters, utils::SkipList<PlanCacheEntry> *plan_cache,
|
||||
DbAccessor *db_accessor,
|
||||
const std::vector<Identifier *> &predefined_identifiers) {
|
||||
std::optional<utils::SkipList<PlanCacheEntry>::Accessor> plan_cache_access;
|
||||
std::shared_ptr<PlanWrapper> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
|
||||
const Parameters ¶meters, PlanCacheLRU *plan_cache,
|
||||
DbAccessor *db_accessor,
|
||||
const std::vector<Identifier *> &predefined_identifiers) {
|
||||
if (plan_cache) {
|
||||
plan_cache_access.emplace(plan_cache->access());
|
||||
auto it = plan_cache_access->find(hash);
|
||||
if (it != plan_cache_access->end()) {
|
||||
if (it->second->IsExpired()) {
|
||||
plan_cache_access->remove(hash);
|
||||
} else {
|
||||
return it->second;
|
||||
}
|
||||
auto existing_plan = plan_cache->WithLock([&](auto &cache) { return cache.get(hash); });
|
||||
if (existing_plan.has_value()) {
|
||||
return existing_plan.value();
|
||||
}
|
||||
}
|
||||
|
||||
auto plan = std::make_shared<CachedPlan>(
|
||||
auto plan = std::make_shared<PlanWrapper>(
|
||||
MakeLogicalPlan(std::move(ast_storage), query, parameters, db_accessor, predefined_identifiers));
|
||||
if (plan_cache_access) {
|
||||
plan_cache_access->insert({hash, plan});
|
||||
|
||||
if (plan_cache) {
|
||||
plan_cache->WithLock([&](auto &cache) { cache.put(hash, plan); });
|
||||
}
|
||||
|
||||
return plan;
|
||||
}
|
||||
} // namespace memgraph::query
|
||||
|
@ -17,12 +17,14 @@
|
||||
#include "query/frontend/stripped.hpp"
|
||||
#include "query/plan/planner.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
#include "utils/lru_cache.hpp"
|
||||
#include "utils/synchronized.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_bool(query_cost_planner);
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_int32(query_plan_cache_ttl);
|
||||
DECLARE_int32(query_plan_cache_max_size);
|
||||
|
||||
namespace memgraph::query {
|
||||
|
||||
@ -45,23 +47,17 @@ class LogicalPlan {
|
||||
virtual const AstStorage &GetAstStorage() const = 0;
|
||||
};
|
||||
|
||||
class CachedPlan {
|
||||
class PlanWrapper {
|
||||
public:
|
||||
explicit CachedPlan(std::unique_ptr<LogicalPlan> plan);
|
||||
explicit PlanWrapper(std::unique_ptr<LogicalPlan> plan);
|
||||
|
||||
const auto &plan() const { return plan_->GetRoot(); }
|
||||
double cost() const { return plan_->GetCost(); }
|
||||
const auto &symbol_table() const { return plan_->GetSymbolTable(); }
|
||||
const auto &ast_storage() const { return plan_->GetAstStorage(); }
|
||||
|
||||
bool IsExpired() const {
|
||||
// NOLINTNEXTLINE (modernize-use-nullptr)
|
||||
return cache_timer_.Elapsed() > std::chrono::seconds(FLAGS_query_plan_cache_ttl);
|
||||
};
|
||||
|
||||
private:
|
||||
std::unique_ptr<LogicalPlan> plan_;
|
||||
utils::Timer cache_timer_;
|
||||
};
|
||||
|
||||
struct CachedQuery {
|
||||
@ -82,18 +78,6 @@ struct QueryCacheEntry {
|
||||
CachedQuery second;
|
||||
};
|
||||
|
||||
struct PlanCacheEntry {
|
||||
bool operator==(const PlanCacheEntry &other) const { return first == other.first; }
|
||||
bool operator<(const PlanCacheEntry &other) const { return first < other.first; }
|
||||
bool operator==(const uint64_t &other) const { return first == other; }
|
||||
bool operator<(const uint64_t &other) const { return first < other; }
|
||||
|
||||
uint64_t first;
|
||||
// TODO: Maybe store the query string here and use it as a key with the hash
|
||||
// so that we eliminate the risk of hash collisions.
|
||||
std::shared_ptr<CachedPlan> second;
|
||||
};
|
||||
|
||||
/**
|
||||
* A container for data related to the parsing of a query.
|
||||
*/
|
||||
@ -129,6 +113,9 @@ class SingleNodeLogicalPlan final : public LogicalPlan {
|
||||
SymbolTable symbol_table_;
|
||||
};
|
||||
|
||||
using PlanCacheLRU =
|
||||
utils::Synchronized<utils::LRUCache<uint64_t, std::shared_ptr<query::PlanWrapper>>, utils::RWSpinLock>;
|
||||
|
||||
std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters,
|
||||
DbAccessor *db_accessor,
|
||||
const std::vector<Identifier *> &predefined_identifiers);
|
||||
@ -141,9 +128,9 @@ std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery
|
||||
* If an identifier is contained there, we inject it at that place and remove it,
|
||||
* because a predefined identifier can be used only in one scope.
|
||||
*/
|
||||
std::shared_ptr<CachedPlan> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
|
||||
const Parameters ¶meters, utils::SkipList<PlanCacheEntry> *plan_cache,
|
||||
DbAccessor *db_accessor,
|
||||
const std::vector<Identifier *> &predefined_identifiers = {});
|
||||
std::shared_ptr<PlanWrapper> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
|
||||
const Parameters ¶meters, PlanCacheLRU *plan_cache,
|
||||
DbAccessor *db_accessor,
|
||||
const std::vector<Identifier *> &predefined_identifiers = {});
|
||||
|
||||
} // namespace memgraph::query
|
||||
|
@ -43,22 +43,40 @@ struct CachedValue {
|
||||
return cache_.get_allocator().GetMemoryResource();
|
||||
}
|
||||
|
||||
bool CacheValue(const TypedValue &maybe_list) {
|
||||
// Func to check if cache_ contains value
|
||||
bool CacheValue(TypedValue &&maybe_list) {
|
||||
if (!maybe_list.IsList()) {
|
||||
return false;
|
||||
}
|
||||
const auto &list = maybe_list.ValueList();
|
||||
auto &list = maybe_list.ValueList();
|
||||
TypedValue::Hash hash{};
|
||||
for (const TypedValue &list_elem : list) {
|
||||
const auto list_elem_key = hash(list_elem);
|
||||
auto &vector_values = cache_[list_elem_key];
|
||||
if (!IsValueInVec(vector_values, list_elem)) {
|
||||
vector_values.push_back(list_elem);
|
||||
for (auto &element : list) {
|
||||
const auto key = hash(element);
|
||||
auto &vector_values = cache_[key];
|
||||
if (!IsValueInVec(vector_values, element)) {
|
||||
vector_values.emplace_back(std::move(element));
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CacheValue(const TypedValue &maybe_list) {
|
||||
if (!maybe_list.IsList()) {
|
||||
return false;
|
||||
}
|
||||
auto &list = maybe_list.ValueList();
|
||||
TypedValue::Hash hash{};
|
||||
for (auto &element : list) {
|
||||
const auto key = hash(element);
|
||||
auto &vector_values = cache_[key];
|
||||
if (!IsValueInVec(vector_values, element)) {
|
||||
vector_values.emplace_back(element);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Func to check if cache_ contains value
|
||||
bool ContainsValue(const TypedValue &value) const {
|
||||
TypedValue::Hash hash{};
|
||||
const auto key = hash(value);
|
||||
|
@ -270,36 +270,33 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
|
||||
}
|
||||
|
||||
TypedValue Visit(InListOperator &in_list) override {
|
||||
TypedValue *_list_ptr = nullptr;
|
||||
TypedValue _list;
|
||||
auto literal = in_list.expression1_->Accept(*this);
|
||||
|
||||
auto get_list_literal = [this, &in_list, &_list, &_list_ptr]() -> void {
|
||||
auto get_list_literal = [this, &in_list]() -> TypedValue {
|
||||
ReferenceExpressionEvaluator reference_expression_evaluator{frame_, symbol_table_, ctx_};
|
||||
_list_ptr = in_list.expression2_->Accept(reference_expression_evaluator);
|
||||
if (nullptr == _list_ptr) {
|
||||
_list = in_list.expression2_->Accept(*this);
|
||||
_list_ptr = &_list;
|
||||
auto *list_ptr = in_list.expression2_->Accept(reference_expression_evaluator);
|
||||
if (nullptr == list_ptr) {
|
||||
return in_list.expression2_->Accept(*this);
|
||||
}
|
||||
return *list_ptr;
|
||||
};
|
||||
|
||||
auto do_list_literal_checks = [this, &literal, &_list_ptr]() -> std::optional<TypedValue> {
|
||||
MG_ASSERT(_list_ptr, "List literal should have been defined");
|
||||
if (_list_ptr->IsNull()) {
|
||||
auto do_list_literal_checks = [this, &literal](const TypedValue &list) -> std::optional<TypedValue> {
|
||||
if (list.IsNull()) {
|
||||
return TypedValue(ctx_->memory);
|
||||
}
|
||||
// Exceptions have higher priority than returning nulls when list expression
|
||||
// is not null.
|
||||
if (_list_ptr->type() != TypedValue::Type::List) {
|
||||
throw QueryRuntimeException("IN expected a list, got {}.", _list_ptr->type());
|
||||
if (list.type() != TypedValue::Type::List) {
|
||||
throw QueryRuntimeException("IN expected a list, got {}.", list.type());
|
||||
}
|
||||
const auto &list = _list_ptr->ValueList();
|
||||
const auto &list_value = list.ValueList();
|
||||
|
||||
// If literal is NULL there is no need to try to compare it with every
|
||||
// element in the list since result of every comparison will be NULL. There
|
||||
// is one special case that we must test explicitly: if list is empty then
|
||||
// result is false since no comparison will be performed.
|
||||
if (list.empty()) return TypedValue(false, ctx_->memory);
|
||||
if (list_value.empty()) return TypedValue(false, ctx_->memory);
|
||||
if (literal.IsNull()) return TypedValue(ctx_->memory);
|
||||
return {};
|
||||
};
|
||||
@ -312,13 +309,13 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
|
||||
if (!frame_change_collector_->IsKeyValueCached(*cached_id)) {
|
||||
// Check only first time if everything is okay, later when we use
|
||||
// cache there is no need to check again as we did check first time
|
||||
get_list_literal();
|
||||
auto preoperational_checks = do_list_literal_checks();
|
||||
auto list = get_list_literal();
|
||||
auto preoperational_checks = do_list_literal_checks(list);
|
||||
if (preoperational_checks) {
|
||||
return std::move(*preoperational_checks);
|
||||
}
|
||||
auto &cached_value = frame_change_collector_->GetCachedValue(*cached_id);
|
||||
cached_value.CacheValue(*_list_ptr);
|
||||
cached_value.CacheValue(std::move(list));
|
||||
spdlog::trace("Value cached {}", *cached_id);
|
||||
}
|
||||
const auto &cached_value = frame_change_collector_->GetCachedValue(*cached_id);
|
||||
@ -334,16 +331,16 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
|
||||
}
|
||||
// When caching is not an option, we need to evaluate list literal every time
|
||||
// and do the checks
|
||||
get_list_literal();
|
||||
auto preoperational_checks = do_list_literal_checks();
|
||||
const auto list = get_list_literal();
|
||||
auto preoperational_checks = do_list_literal_checks(list);
|
||||
if (preoperational_checks) {
|
||||
return std::move(*preoperational_checks);
|
||||
}
|
||||
|
||||
const auto &list = _list.ValueList();
|
||||
const auto &list_value = list.ValueList();
|
||||
spdlog::trace("Not using cache on IN LIST operator");
|
||||
auto has_null = false;
|
||||
for (const auto &element : list) {
|
||||
for (const auto &element : list_value) {
|
||||
auto result = literal == element;
|
||||
if (result.IsNull()) {
|
||||
has_null = true;
|
||||
|
@ -1213,7 +1213,7 @@ struct TxTimeout {
|
||||
};
|
||||
|
||||
struct PullPlan {
|
||||
explicit PullPlan(std::shared_ptr<CachedPlan> plan, const Parameters ¶meters, bool is_profile_query,
|
||||
explicit PullPlan(std::shared_ptr<PlanWrapper> plan, const Parameters ¶meters, bool is_profile_query,
|
||||
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
|
||||
std::optional<std::string> username, std::atomic<TransactionStatus> *transaction_status,
|
||||
std::shared_ptr<utils::AsyncTimer> tx_timer,
|
||||
@ -1226,7 +1226,7 @@ struct PullPlan {
|
||||
std::map<std::string, TypedValue> *summary);
|
||||
|
||||
private:
|
||||
std::shared_ptr<CachedPlan> plan_ = nullptr;
|
||||
std::shared_ptr<PlanWrapper> plan_ = nullptr;
|
||||
plan::UniqueCursorPtr cursor_ = nullptr;
|
||||
Frame frame_;
|
||||
ExecutionContext ctx_;
|
||||
@ -1253,7 +1253,7 @@ struct PullPlan {
|
||||
bool use_monotonic_memory_;
|
||||
};
|
||||
|
||||
PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters ¶meters, const bool is_profile_query,
|
||||
PullPlan::PullPlan(const std::shared_ptr<PlanWrapper> plan, const Parameters ¶meters, const bool is_profile_query,
|
||||
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
|
||||
std::optional<std::string> username, std::atomic<TransactionStatus> *transaction_status,
|
||||
std::shared_ptr<utils::AsyncTimer> tx_timer, TriggerContextCollector *trigger_context_collector,
|
||||
@ -2106,10 +2106,7 @@ PreparedQuery PrepareAnalyzeGraphQuery(ParsedQuery parsed_query, bool in_explici
|
||||
|
||||
// Creating an index influences computed plan costs.
|
||||
auto invalidate_plan_cache = [plan_cache = current_db.db_acc_->get()->plan_cache()] {
|
||||
auto access = plan_cache->access();
|
||||
for (auto &kv : access) {
|
||||
access.remove(kv.first);
|
||||
}
|
||||
plan_cache->WithLock([&](auto &cache) { cache.reset(); });
|
||||
};
|
||||
utils::OnScopeExit cache_invalidator(invalidate_plan_cache);
|
||||
|
||||
@ -2154,10 +2151,7 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
|
||||
|
||||
// Creating an index influences computed plan costs.
|
||||
auto invalidate_plan_cache = [plan_cache = db_acc->plan_cache()] {
|
||||
auto access = plan_cache->access();
|
||||
for (auto &kv : access) {
|
||||
access.remove(kv.first);
|
||||
}
|
||||
plan_cache->WithLock([&](auto &cache) { cache.reset(); });
|
||||
};
|
||||
|
||||
auto *storage = db_acc->storage();
|
||||
|
@ -27,6 +27,7 @@
|
||||
|
||||
#include <cppitertools/chain.hpp>
|
||||
#include <cppitertools/imap.hpp>
|
||||
#include "memory/query_memory_control.hpp"
|
||||
#include "query/common.hpp"
|
||||
#include "spdlog/spdlog.h"
|
||||
|
||||
@ -57,6 +58,7 @@
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/memory.hpp"
|
||||
#include "utils/message.hpp"
|
||||
#include "utils/on_scope_exit.hpp"
|
||||
#include "utils/pmr/deque.hpp"
|
||||
#include "utils/pmr/list.hpp"
|
||||
#include "utils/pmr/unordered_map.hpp"
|
||||
@ -4617,7 +4619,7 @@ UniqueCursorPtr OutputTableStream::MakeCursor(utils::MemoryResource *mem) const
|
||||
|
||||
CallProcedure::CallProcedure(std::shared_ptr<LogicalOperator> input, std::string name, std::vector<Expression *> args,
|
||||
std::vector<std::string> fields, std::vector<Symbol> symbols, Expression *memory_limit,
|
||||
size_t memory_scale, bool is_write, bool void_procedure)
|
||||
size_t memory_scale, bool is_write, int64_t procedure_id, bool void_procedure)
|
||||
: input_(input ? input : std::make_shared<Once>()),
|
||||
procedure_name_(name),
|
||||
arguments_(args),
|
||||
@ -4626,6 +4628,7 @@ CallProcedure::CallProcedure(std::shared_ptr<LogicalOperator> input, std::string
|
||||
memory_limit_(memory_limit),
|
||||
memory_scale_(memory_scale),
|
||||
is_write_(is_write),
|
||||
procedure_id_(procedure_id),
|
||||
void_procedure_(void_procedure) {}
|
||||
|
||||
ACCEPT_WITH_INPUT(CallProcedure);
|
||||
@ -4654,7 +4657,7 @@ namespace {
|
||||
void CallCustomProcedure(const std::string_view fully_qualified_procedure_name, const mgp_proc &proc,
|
||||
const std::vector<Expression *> &args, mgp_graph &graph, ExpressionEvaluator *evaluator,
|
||||
utils::MemoryResource *memory, std::optional<size_t> memory_limit, mgp_result *result,
|
||||
const bool call_initializer = false) {
|
||||
int64_t procedure_id, uint64_t transaction_id, const bool call_initializer = false) {
|
||||
static_assert(std::uses_allocator_v<mgp_value, utils::Allocator<mgp_value>>,
|
||||
"Expected mgp_value to use custom allocator and makes STL "
|
||||
"containers aware of that");
|
||||
@ -4686,12 +4689,46 @@ void CallCustomProcedure(const std::string_view fully_qualified_procedure_name,
|
||||
if (memory_limit) {
|
||||
SPDLOG_INFO("Running '{}' with memory limit of {}", fully_qualified_procedure_name,
|
||||
utils::GetReadableSize(*memory_limit));
|
||||
utils::LimitedMemoryResource limited_mem(memory, *memory_limit);
|
||||
mgp_memory proc_memory{&limited_mem};
|
||||
// Only allocations which can leak memory are
|
||||
// our own mgp object allocations. Jemalloc can track
|
||||
// memory correctly, but some memory may not be released
|
||||
// immediately, so we want to give user info on leak still
|
||||
// considering our allocations
|
||||
utils::MemoryTrackingResource memory_tracking_resource{memory, *memory_limit};
|
||||
// if we are already tracking, no harm no faul
|
||||
// if we are not tracking, we need to start now, with unlimited memory
|
||||
// for query, but limited for procedure
|
||||
|
||||
// check if transaction is tracked currently, so we
|
||||
// can disable tracking on that arena if it is not
|
||||
// once we are done with procedure tracking
|
||||
|
||||
bool is_transaction_tracked = memgraph::memory::IsTransactionTracked(transaction_id);
|
||||
|
||||
if (!is_transaction_tracked) {
|
||||
// start tracking with unlimited limit on query
|
||||
// which is same as not being tracked at all
|
||||
memgraph::memory::TryStartTrackingOnTransaction(transaction_id, memgraph::memory::UNLIMITED_MEMORY);
|
||||
}
|
||||
memgraph::memory::StartTrackingCurrentThreadTransaction(transaction_id);
|
||||
|
||||
// due to mgp_batch_read_proc and mgp_batch_write_proc
|
||||
// we can return to execution without exhausting whole
|
||||
// memory. Here we need to update tracking
|
||||
memgraph::memory::CreateOrContinueProcedureTracking(transaction_id, procedure_id, *memory_limit);
|
||||
|
||||
mgp_memory proc_memory{&memory_tracking_resource};
|
||||
MG_ASSERT(result->signature == &proc.results);
|
||||
|
||||
utils::OnScopeExit on_scope_exit{[transaction_id = transaction_id]() {
|
||||
memgraph::memory::StopTrackingCurrentThreadTransaction(transaction_id);
|
||||
memgraph::memory::PauseProcedureTracking(transaction_id);
|
||||
}};
|
||||
|
||||
// TODO: What about cross library boundary exceptions? OMG C++?!
|
||||
proc.cb(&proc_args, &graph, result, &proc_memory);
|
||||
size_t leaked_bytes = limited_mem.GetAllocatedBytes();
|
||||
|
||||
auto leaked_bytes = memory_tracking_resource.GetAllocatedBytes();
|
||||
if (leaked_bytes > 0U) {
|
||||
spdlog::warn("Query procedure '{}' leaked {} *tracked* bytes", fully_qualified_procedure_name, leaked_bytes);
|
||||
}
|
||||
@ -4799,8 +4836,10 @@ class CallProcedureCursor : public Cursor {
|
||||
auto *memory = self_->memory_resource;
|
||||
auto memory_limit = EvaluateMemoryLimit(evaluator, self_->memory_limit_, self_->memory_scale_);
|
||||
auto graph = mgp_graph::WritableGraph(*context.db_accessor, graph_view, context);
|
||||
const auto transaction_id = context.db_accessor->GetTransactionId();
|
||||
MG_ASSERT(transaction_id.has_value());
|
||||
CallCustomProcedure(self_->procedure_name_, *proc, self_->arguments_, graph, &evaluator, memory, memory_limit,
|
||||
result_, call_initializer);
|
||||
result_, self_->procedure_id_, transaction_id.value(), call_initializer);
|
||||
|
||||
if (call_initializer) call_initializer = false;
|
||||
|
||||
|
@ -2361,7 +2361,7 @@ class CallProcedure : public memgraph::query::plan::LogicalOperator {
|
||||
CallProcedure() = default;
|
||||
CallProcedure(std::shared_ptr<LogicalOperator> input, std::string name, std::vector<Expression *> arguments,
|
||||
std::vector<std::string> fields, std::vector<Symbol> symbols, Expression *memory_limit,
|
||||
size_t memory_scale, bool is_write, bool void_procedure = false);
|
||||
size_t memory_scale, bool is_write, int64_t procedure_id, bool void_procedure = false);
|
||||
|
||||
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
|
||||
UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override;
|
||||
@ -2383,6 +2383,7 @@ class CallProcedure : public memgraph::query::plan::LogicalOperator {
|
||||
Expression *memory_limit_{nullptr};
|
||||
size_t memory_scale_{1024U};
|
||||
bool is_write_;
|
||||
int64_t procedure_id_;
|
||||
bool void_procedure_;
|
||||
mutable utils::MonotonicBufferResource monotonic_memory{1024UL * 1024UL};
|
||||
utils::MemoryResource *memory_resource = &monotonic_memory;
|
||||
@ -2405,6 +2406,7 @@ class CallProcedure : public memgraph::query::plan::LogicalOperator {
|
||||
object->memory_limit_ = memory_limit_ ? memory_limit_->Clone(storage) : nullptr;
|
||||
object->memory_scale_ = memory_scale_;
|
||||
object->is_write_ = is_write_;
|
||||
object->procedure_id_ = procedure_id_;
|
||||
object->void_procedure_ = void_procedure_;
|
||||
return object;
|
||||
}
|
||||
|
@ -12,6 +12,7 @@
|
||||
/// @file
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <optional>
|
||||
#include <variant>
|
||||
|
||||
@ -185,6 +186,10 @@ class RuleBasedPlanner {
|
||||
|
||||
uint64_t merge_id = 0;
|
||||
uint64_t subquery_id = 0;
|
||||
// procedures need to start from 1
|
||||
// due to swapping mechanism of procedure
|
||||
// tracking
|
||||
uint64_t procedure_id = 1;
|
||||
|
||||
for (const auto &clause : single_query_part.remaining_clauses) {
|
||||
MG_ASSERT(!utils::IsSubtype(*clause, Match::kType), "Unexpected Match in remaining clauses");
|
||||
@ -224,7 +229,7 @@ class RuleBasedPlanner {
|
||||
input_op = std::make_unique<plan::CallProcedure>(
|
||||
std::move(input_op), call_proc->procedure_name_, call_proc->arguments_, call_proc->result_fields_,
|
||||
result_symbols, call_proc->memory_limit_, call_proc->memory_scale_, call_proc->is_write_,
|
||||
call_proc->void_procedure_);
|
||||
procedure_id++, call_proc->void_procedure_);
|
||||
} else if (auto *load_csv = utils::Downcast<query::LoadCsv>(clause)) {
|
||||
const auto &row_sym = context.symbol_table->at(*load_csv->row_var_);
|
||||
context.bound_symbols.insert(row_sym);
|
||||
|
@ -169,7 +169,7 @@ Trigger::TriggerPlan::TriggerPlan(std::unique_ptr<LogicalPlan> logical_plan, std
|
||||
std::shared_ptr<Trigger::TriggerPlan> Trigger::GetPlan(DbAccessor *db_accessor,
|
||||
const query::AuthChecker *auth_checker) const {
|
||||
std::lock_guard plan_guard{plan_lock_};
|
||||
if (!parsed_statements_.is_cacheable || !trigger_plan_ || trigger_plan_->cached_plan.IsExpired()) {
|
||||
if (!parsed_statements_.is_cacheable || !trigger_plan_) {
|
||||
auto identifiers = GetPredefinedIdentifiers(event_type_);
|
||||
|
||||
AstStorage ast_storage;
|
||||
|
@ -62,7 +62,7 @@ struct Trigger {
|
||||
|
||||
explicit TriggerPlan(std::unique_ptr<LogicalPlan> logical_plan, std::vector<IdentifierInfo> identifiers);
|
||||
|
||||
CachedPlan cached_plan;
|
||||
PlanWrapper cached_plan;
|
||||
std::vector<IdentifierInfo> identifiers;
|
||||
};
|
||||
std::shared_ptr<TriggerPlan> GetPlan(DbAccessor *db_accessor, const query::AuthChecker *auth_checker) const;
|
||||
|
@ -14,7 +14,8 @@ set(utils_src_files
|
||||
tsc.cpp
|
||||
system_info.cpp
|
||||
uuid.cpp
|
||||
build_info.cpp)
|
||||
build_info.cpp
|
||||
query_memory_tracker.cpp)
|
||||
|
||||
find_package(Boost REQUIRED)
|
||||
find_package(fmt REQUIRED)
|
||||
|
68
src/utils/lru_cache.hpp
Normal file
68
src/utils/lru_cache.hpp
Normal file
@ -0,0 +1,68 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <list>
|
||||
#include <optional>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
|
||||
namespace memgraph::utils {
|
||||
|
||||
/// A simple LRU cache implementation.
|
||||
/// It is not thread-safe.
|
||||
|
||||
template <class TKey, class TVal>
|
||||
class LRUCache {
|
||||
public:
|
||||
LRUCache(int cache_size_) : cache_size(cache_size_){};
|
||||
|
||||
void put(const TKey &key, const TVal &val) {
|
||||
auto it = item_map.find(key);
|
||||
if (it != item_map.end()) {
|
||||
item_list.erase(it->second);
|
||||
item_map.erase(it);
|
||||
}
|
||||
item_list.push_front(std::make_pair(key, val));
|
||||
item_map.insert(std::make_pair(key, item_list.begin()));
|
||||
try_clean();
|
||||
};
|
||||
std::optional<TVal> get(const TKey &key) {
|
||||
if (!exists(key)) {
|
||||
return std::nullopt;
|
||||
}
|
||||
auto it = item_map.find(key);
|
||||
item_list.splice(item_list.begin(), item_list, it->second);
|
||||
return it->second->second;
|
||||
}
|
||||
void reset() {
|
||||
item_list.clear();
|
||||
item_map.clear();
|
||||
};
|
||||
std::size_t size() { return item_map.size(); };
|
||||
|
||||
private:
|
||||
void try_clean() {
|
||||
while (item_map.size() > cache_size) {
|
||||
auto last_it_elem_it = item_list.end();
|
||||
last_it_elem_it--;
|
||||
item_map.erase(last_it_elem_it->first);
|
||||
item_list.pop_back();
|
||||
}
|
||||
};
|
||||
bool exists(const TKey &key) { return (item_map.count(key) > 0); };
|
||||
|
||||
std::list<std::pair<TKey, TVal>> item_list;
|
||||
std::unordered_map<TKey, decltype(item_list.begin())> item_map;
|
||||
std::size_t cache_size;
|
||||
};
|
||||
} // namespace memgraph::utils
|
@ -539,9 +539,9 @@ class SynchronizedPoolResource final : public MemoryResource {
|
||||
bool DoIsEqual(const MemoryResource &other) const noexcept override { return this == &other; }
|
||||
};
|
||||
|
||||
class LimitedMemoryResource final : public utils::MemoryResource {
|
||||
class MemoryTrackingResource final : public utils::MemoryResource {
|
||||
public:
|
||||
explicit LimitedMemoryResource(utils::MemoryResource *memory, size_t max_allocated_bytes)
|
||||
explicit MemoryTrackingResource(utils::MemoryResource *memory, size_t max_allocated_bytes)
|
||||
: memory_(memory), max_allocated_bytes_(max_allocated_bytes) {}
|
||||
|
||||
size_t GetAllocatedBytes() const noexcept { return max_allocated_bytes_ - available_bytes_; }
|
||||
@ -552,13 +552,11 @@ class LimitedMemoryResource final : public utils::MemoryResource {
|
||||
size_t available_bytes_{max_allocated_bytes_};
|
||||
|
||||
void *DoAllocate(size_t bytes, size_t alignment) override {
|
||||
if (bytes > available_bytes_) throw utils::BadAlloc("Memory allocation limit exceeded!");
|
||||
available_bytes_ -= bytes;
|
||||
return memory_->Allocate(bytes, alignment);
|
||||
}
|
||||
|
||||
void DoDeallocate(void *p, size_t bytes, size_t alignment) override {
|
||||
MG_ASSERT(available_bytes_ + bytes > available_bytes_, "Failed deallocation");
|
||||
available_bytes_ += bytes;
|
||||
return memory_->Deallocate(p, bytes, alignment);
|
||||
}
|
||||
|
@ -111,7 +111,7 @@ void MemoryTracker::Alloc(const int64_t size) {
|
||||
|
||||
const auto current_hard_limit = hard_limit_.load(std::memory_order_relaxed);
|
||||
|
||||
if (UNLIKELY(current_hard_limit && will_be > current_hard_limit && MemoryTrackerCanThrow())) {
|
||||
if (current_hard_limit && will_be > current_hard_limit && MemoryTrackerCanThrow()) [[unlikely]] {
|
||||
MemoryTracker::OutOfMemoryExceptionBlocker exception_blocker;
|
||||
|
||||
amount_.fetch_sub(size, std::memory_order_relaxed);
|
||||
@ -121,7 +121,6 @@ void MemoryTracker::Alloc(const int64_t size) {
|
||||
"use to {}, while the maximum allowed size for allocation is set to {}.",
|
||||
GetReadableSize(size), GetReadableSize(will_be), GetReadableSize(current_hard_limit)));
|
||||
}
|
||||
|
||||
UpdatePeak(will_be);
|
||||
}
|
||||
|
||||
|
@ -25,17 +25,6 @@ class OutOfMemoryException : public utils::BasicException {
|
||||
};
|
||||
|
||||
class MemoryTracker final {
|
||||
private:
|
||||
std::atomic<int64_t> amount_{0};
|
||||
std::atomic<int64_t> peak_{0};
|
||||
std::atomic<int64_t> hard_limit_{0};
|
||||
// Maximum possible value of a hard limit. If it's set to 0, no upper bound on the hard limit is set.
|
||||
int64_t maximum_hard_limit_{0};
|
||||
|
||||
void UpdatePeak(int64_t will_be);
|
||||
|
||||
static void LogMemoryUsage(int64_t current);
|
||||
|
||||
public:
|
||||
void LogPeakMemoryUsage() const;
|
||||
|
||||
@ -73,6 +62,13 @@ class MemoryTracker final {
|
||||
|
||||
void ResetTrackings();
|
||||
|
||||
bool IsProcedureTracked();
|
||||
|
||||
void SetProcTrackingLimit(size_t limit);
|
||||
|
||||
void StartProcTracking();
|
||||
void StopProcTracking();
|
||||
|
||||
// By creating an object of this class, every allocation in its scope that goes over
|
||||
// the set hard limit produces an OutOfMemoryException.
|
||||
class OutOfMemoryExceptionEnabler final {
|
||||
@ -109,6 +105,20 @@ class MemoryTracker final {
|
||||
private:
|
||||
static thread_local uint64_t counter_;
|
||||
};
|
||||
|
||||
private:
|
||||
enum class TrackingMode { DEFAULT, ADDITIONAL_PROC };
|
||||
|
||||
TrackingMode tracking_mode_{TrackingMode::DEFAULT};
|
||||
std::atomic<int64_t> amount_{0};
|
||||
std::atomic<int64_t> peak_{0};
|
||||
std::atomic<int64_t> hard_limit_{0};
|
||||
// Maximum possible value of a hard limit. If it's set to 0, no upper bound on the hard limit is set.
|
||||
int64_t maximum_hard_limit_{0};
|
||||
|
||||
void UpdatePeak(int64_t will_be);
|
||||
|
||||
static void LogMemoryUsage(int64_t current);
|
||||
};
|
||||
|
||||
// Global memory tracker which tracks every allocation in the application.
|
||||
|
78
src/utils/query_memory_tracker.cpp
Normal file
78
src/utils/query_memory_tracker.cpp
Normal file
@ -0,0 +1,78 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "utils/query_memory_tracker.hpp"
|
||||
#include <atomic>
|
||||
#include <optional>
|
||||
#include "memory/query_memory_control.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
|
||||
namespace memgraph::utils {
|
||||
|
||||
void QueryMemoryTracker::TrackAlloc(size_t size) {
|
||||
if (query_tracker_.has_value()) [[likely]] {
|
||||
query_tracker_->Alloc(static_cast<int64_t>(size));
|
||||
}
|
||||
|
||||
auto *proc_tracker = GetActiveProc();
|
||||
|
||||
if (proc_tracker == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
proc_tracker->Alloc(static_cast<int64_t>(size));
|
||||
}
|
||||
void QueryMemoryTracker::TrackFree(size_t size) {
|
||||
if (query_tracker_.has_value()) [[likely]] {
|
||||
query_tracker_->Free(static_cast<int64_t>(size));
|
||||
}
|
||||
|
||||
auto *proc_tracker = GetActiveProc();
|
||||
|
||||
if (proc_tracker == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
proc_tracker->Free(static_cast<int64_t>(size));
|
||||
}
|
||||
|
||||
void QueryMemoryTracker::SetQueryLimit(size_t size) {
|
||||
if (size == memgraph::memory::UNLIMITED_MEMORY) {
|
||||
return;
|
||||
}
|
||||
InitializeQueryTracker();
|
||||
query_tracker_->SetMaximumHardLimit(static_cast<int64_t>(size));
|
||||
query_tracker_->SetHardLimit(static_cast<int64_t>(size));
|
||||
}
|
||||
|
||||
memgraph::utils::MemoryTracker *QueryMemoryTracker::GetActiveProc() {
|
||||
if (active_proc_id == NO_PROCEDURE) [[likely]] {
|
||||
return nullptr;
|
||||
}
|
||||
return &proc_memory_trackers_[active_proc_id];
|
||||
}
|
||||
|
||||
void QueryMemoryTracker::SetActiveProc(int64_t new_active_proc) { active_proc_id = new_active_proc; }
|
||||
|
||||
void QueryMemoryTracker::StopProcTracking() { active_proc_id = QueryMemoryTracker::NO_PROCEDURE; }
|
||||
|
||||
void QueryMemoryTracker::TryCreateProcTracker(int64_t procedure_id, size_t limit) {
|
||||
if (proc_memory_trackers_.contains(procedure_id)) {
|
||||
return;
|
||||
}
|
||||
auto [it, inserted] = proc_memory_trackers_.emplace(procedure_id, utils::MemoryTracker{});
|
||||
it->second.SetMaximumHardLimit(static_cast<int64_t>(limit));
|
||||
it->second.SetHardLimit(static_cast<int64_t>(limit));
|
||||
}
|
||||
|
||||
void QueryMemoryTracker::InitializeQueryTracker() { query_tracker_.emplace(MemoryTracker{}); }
|
||||
|
||||
} // namespace memgraph::utils
|
78
src/utils/query_memory_tracker.hpp
Normal file
78
src/utils/query_memory_tracker.hpp
Normal file
@ -0,0 +1,78 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <optional>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include "utils/memory_tracker.hpp"
|
||||
|
||||
namespace memgraph::utils {
|
||||
|
||||
// Class which handles tracking of query memory
|
||||
// together with procedure memory. Default
|
||||
// active procedure id is set to -1, as
|
||||
// procedures start from id 1.
|
||||
// This class is meant to be used in environment
|
||||
// where we don't execute multiple procedures in parallel
|
||||
// but procedure can have multiple threads which are doing allocations
|
||||
class QueryMemoryTracker {
|
||||
public:
|
||||
QueryMemoryTracker() = default;
|
||||
|
||||
QueryMemoryTracker(QueryMemoryTracker &&other) noexcept
|
||||
: query_tracker_(std::move(other.query_tracker_)),
|
||||
proc_memory_trackers_(std::move(other.proc_memory_trackers_)),
|
||||
active_proc_id(other.active_proc_id) {
|
||||
other.active_proc_id = NO_PROCEDURE;
|
||||
}
|
||||
|
||||
QueryMemoryTracker(const QueryMemoryTracker &other) = delete;
|
||||
|
||||
QueryMemoryTracker &operator=(QueryMemoryTracker &&other) = delete;
|
||||
QueryMemoryTracker &operator=(const QueryMemoryTracker &other) = delete;
|
||||
|
||||
~QueryMemoryTracker() = default;
|
||||
|
||||
// Track allocation on query and procedure if active
|
||||
void TrackAlloc(size_t);
|
||||
|
||||
// Track Free on query and procedure if active
|
||||
void TrackFree(size_t);
|
||||
|
||||
// Set query limit
|
||||
void SetQueryLimit(size_t);
|
||||
|
||||
// Create proc tracker if doesn't exist
|
||||
void TryCreateProcTracker(int64_t, size_t);
|
||||
|
||||
// Set currently active procedure
|
||||
void SetActiveProc(int64_t);
|
||||
|
||||
// Stop procedure tracking
|
||||
void StopProcTracking();
|
||||
|
||||
private:
|
||||
static constexpr int64_t NO_PROCEDURE{-1};
|
||||
void InitializeQueryTracker();
|
||||
|
||||
std::optional<memgraph::utils::MemoryTracker> query_tracker_{std::nullopt};
|
||||
std::unordered_map<int64_t, memgraph::utils::MemoryTracker> proc_memory_trackers_;
|
||||
|
||||
// Procedure ids start from 1. Procedure id -1 means there is no procedure
|
||||
// to track.
|
||||
int64_t active_proc_id{NO_PROCEDURE};
|
||||
|
||||
memgraph::utils::MemoryTracker *GetActiveProc();
|
||||
};
|
||||
|
||||
} // namespace memgraph::utils
|
@ -184,7 +184,7 @@ startup_config_dict = {
|
||||
"Set to true to enable telemetry. We collect information about the running system (CPU and memory information) and information about the database runtime (vertex and edge counts and resource usage) to allow for easier improvement of the product.",
|
||||
),
|
||||
"query_cost_planner": ("true", "true", "Use the cost-estimating query planner."),
|
||||
"query_plan_cache_ttl": ("60", "60", "Time to live for cached query plans, in seconds."),
|
||||
"query_plan_cache_max_size": ("1000", "1000", "Maximum number of query plans to cache."),
|
||||
"query_vertex_count_to_expand_existing": (
|
||||
"10",
|
||||
"10",
|
||||
|
@ -11,6 +11,17 @@ target_link_libraries(memgraph__e2e__memory__limit_global_alloc gflags mgclient
|
||||
add_executable(memgraph__e2e__memory__limit_global_alloc_proc memory_limit_global_alloc_proc.cpp)
|
||||
target_link_libraries(memgraph__e2e__memory__limit_global_alloc_proc gflags mgclient mg-utils mg-io Threads::Threads)
|
||||
|
||||
add_executable(memgraph__e2e__memory__limit_delete memory_limit_delete.cpp)
|
||||
target_link_libraries(memgraph__e2e__memory__limit_delete gflags mgclient mg-utils mg-io)
|
||||
|
||||
add_executable(memgraph__e2e__memory__limit_accumulation memory_limit_accumulation.cpp)
|
||||
target_link_libraries(memgraph__e2e__memory__limit_accumulation gflags mgclient mg-utils mg-io)
|
||||
|
||||
add_executable(memgraph__e2e__memory__limit_edge_create memory_limit_edge_create.cpp)
|
||||
target_link_libraries(memgraph__e2e__memory__limit_edge_create gflags mgclient mg-utils mg-io)
|
||||
|
||||
# Query memory limit tests
|
||||
|
||||
add_executable(memgraph__e2e__memory__limit_query_alloc_proc_multi_thread query_memory_limit_proc_multi_thread.cpp)
|
||||
target_link_libraries(memgraph__e2e__memory__limit_query_alloc_proc_multi_thread gflags mgclient mg-utils mg-io Threads::Threads)
|
||||
|
||||
@ -23,11 +34,11 @@ target_link_libraries(memgraph__e2e__memory__limit_query_alloc_proc gflags mgcli
|
||||
add_executable(memgraph__e2e__memory__limit_query_alloc_create_multi_thread query_memory_limit_multi_thread.cpp)
|
||||
target_link_libraries(memgraph__e2e__memory__limit_query_alloc_create_multi_thread gflags mgclient mg-utils mg-io Threads::Threads)
|
||||
|
||||
add_executable(memgraph__e2e__memory__limit_delete memory_limit_delete.cpp)
|
||||
target_link_libraries(memgraph__e2e__memory__limit_delete gflags mgclient mg-utils mg-io)
|
||||
|
||||
add_executable(memgraph__e2e__memory__limit_accumulation memory_limit_accumulation.cpp)
|
||||
target_link_libraries(memgraph__e2e__memory__limit_accumulation gflags mgclient mg-utils mg-io)
|
||||
# Procedure memory limit tests
|
||||
|
||||
add_executable(memgraph__e2e__memory__limit_edge_create memory_limit_edge_create.cpp)
|
||||
target_link_libraries(memgraph__e2e__memory__limit_edge_create gflags mgclient mg-utils mg-io)
|
||||
add_executable(memgraph__e2e__procedure_memory_limit procedure_memory_limit.cpp)
|
||||
target_link_libraries(memgraph__e2e__procedure_memory_limit gflags mgclient mg-utils mg-io)
|
||||
|
||||
add_executable(memgraph__e2e__procedure_memory_limit_multi_proc procedure_memory_limit_multi_proc.cpp)
|
||||
target_link_libraries(memgraph__e2e__procedure_memory_limit_multi_proc gflags mgclient mg-utils mg-io)
|
||||
|
66
tests/e2e/memory/procedure_memory_limit.cpp
Normal file
66
tests/e2e/memory/procedure_memory_limit.cpp
Normal file
@ -0,0 +1,66 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <algorithm>
|
||||
#include <exception>
|
||||
#include <ios>
|
||||
#include <iostream>
|
||||
#include <mgclient.hpp>
|
||||
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_uint64(bolt_port, 7687, "Bolt port");
|
||||
DEFINE_uint64(timeout, 120, "Timeout seconds");
|
||||
DEFINE_bool(multi_db, false, "Run test in multi db environment");
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::SetUsageMessage("Memgraph E2E Query Memory Limit In Multi-Thread For Global Allocators");
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
memgraph::logging::RedirectToStderr();
|
||||
|
||||
mg::Client::Init();
|
||||
|
||||
auto client =
|
||||
mg::Client::Connect({.host = "127.0.0.1", .port = static_cast<uint16_t>(FLAGS_bolt_port), .use_ssl = false});
|
||||
if (!client) {
|
||||
LOG_FATAL("Failed to connect!");
|
||||
}
|
||||
|
||||
if (FLAGS_multi_db) {
|
||||
client->Execute("CREATE DATABASE clean;");
|
||||
client->DiscardAll();
|
||||
client->Execute("USE DATABASE clean;");
|
||||
client->DiscardAll();
|
||||
client->Execute("MATCH (n) DETACH DELETE n;");
|
||||
client->DiscardAll();
|
||||
}
|
||||
|
||||
bool error{false};
|
||||
try {
|
||||
client->Execute(
|
||||
"CALL libproc_memory_limit.alloc_256_mib() PROCEDURE MEMORY LIMIT 200MB YIELD allocated RETURN "
|
||||
"allocated");
|
||||
auto result_rows = client->FetchAll();
|
||||
if (result_rows) {
|
||||
auto row = *result_rows->begin();
|
||||
error = row[0].ValueBool() == false;
|
||||
}
|
||||
|
||||
} catch (const std::exception &e) {
|
||||
error = true;
|
||||
}
|
||||
|
||||
MG_ASSERT(error, "Error should have happend");
|
||||
|
||||
return 0;
|
||||
}
|
68
tests/e2e/memory/procedure_memory_limit_multi_proc.cpp
Normal file
68
tests/e2e/memory/procedure_memory_limit_multi_proc.cpp
Normal file
@ -0,0 +1,68 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <algorithm>
|
||||
#include <exception>
|
||||
#include <ios>
|
||||
#include <iostream>
|
||||
#include <mgclient.hpp>
|
||||
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_uint64(bolt_port, 7687, "Bolt port");
|
||||
DEFINE_uint64(timeout, 120, "Timeout seconds");
|
||||
DEFINE_bool(multi_db, false, "Run test in multi db environment");
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::SetUsageMessage("Memgraph E2E Query Memory Limit In Multi-Thread For Global Allocators");
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
memgraph::logging::RedirectToStderr();
|
||||
|
||||
mg::Client::Init();
|
||||
|
||||
auto client =
|
||||
mg::Client::Connect({.host = "127.0.0.1", .port = static_cast<uint16_t>(FLAGS_bolt_port), .use_ssl = false});
|
||||
if (!client) {
|
||||
LOG_FATAL("Failed to connect!");
|
||||
}
|
||||
|
||||
if (FLAGS_multi_db) {
|
||||
client->Execute("CREATE DATABASE clean;");
|
||||
client->DiscardAll();
|
||||
client->Execute("USE DATABASE clean;");
|
||||
client->DiscardAll();
|
||||
client->Execute("MATCH (n) DETACH DELETE n;");
|
||||
client->DiscardAll();
|
||||
}
|
||||
|
||||
bool test_passed{false};
|
||||
try {
|
||||
client->Execute(
|
||||
"CALL libproc_memory_limit.alloc_256_mib() PROCEDURE MEMORY LIMIT 400MB YIELD allocated WITH allocated AS "
|
||||
"allocated_1"
|
||||
"CALL libproc_memory_limit.alloc_32_mib() PROCEDURE MEMORY LIMIT 10MB YIELD allocated AS allocated_2 RETURN "
|
||||
"allocated_1, allocated_2");
|
||||
auto result_rows = client->FetchAll();
|
||||
if (result_rows) {
|
||||
auto row = *result_rows->begin();
|
||||
test_passed = row[0].ValueBool() == true && row[0].ValueBool() == false;
|
||||
}
|
||||
|
||||
} catch (const std::exception &e) {
|
||||
test_passed = true;
|
||||
}
|
||||
|
||||
MG_ASSERT(test_passed, "Error should have happend");
|
||||
|
||||
return 0;
|
||||
}
|
@ -13,3 +13,8 @@ target_link_libraries(query_memory_limit_proc_multi_thread mg-utils)
|
||||
add_library(query_memory_limit_proc SHARED query_memory_limit_proc.cpp)
|
||||
target_include_directories(query_memory_limit_proc PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
target_link_libraries(query_memory_limit_proc mg-utils)
|
||||
|
||||
|
||||
add_library(proc_memory_limit SHARED proc_memory_limit.cpp)
|
||||
target_include_directories(proc_memory_limit PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
target_link_libraries(proc_memory_limit mg-utils)
|
||||
|
102
tests/e2e/memory/procedures/proc_memory_limit.cpp
Normal file
102
tests/e2e/memory/procedures/proc_memory_limit.cpp
Normal file
@ -0,0 +1,102 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
#include <exception>
|
||||
#include <functional>
|
||||
#include <mgp.hpp>
|
||||
#include <mutex>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "mg_procedure.h"
|
||||
#include "utils/on_scope_exit.hpp"
|
||||
|
||||
enum mgp_error Alloc_256(mgp_memory *memory, void *ptr) {
|
||||
const size_t mib_size_256 = 1 << 28;
|
||||
|
||||
return mgp_alloc(memory, mib_size_256, (void **)(&ptr));
|
||||
}
|
||||
|
||||
void Alloc_256_MiB(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
|
||||
mgp::MemoryDispatcherGuard guard{memory};
|
||||
const auto arguments = mgp::List(args);
|
||||
const auto record_factory = mgp::RecordFactory(result);
|
||||
|
||||
try {
|
||||
void *ptr{nullptr};
|
||||
|
||||
memgraph::utils::OnScopeExit<std::function<void(void)>> cleanup{[&memory, &ptr]() {
|
||||
if (nullptr == ptr) {
|
||||
return;
|
||||
}
|
||||
mgp_free(memory, ptr);
|
||||
}};
|
||||
|
||||
const enum mgp_error alloc_err = Alloc_256(memory, ptr);
|
||||
auto new_record = record_factory.NewRecord();
|
||||
new_record.Insert("allocated", alloc_err != mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE);
|
||||
} catch (std::exception &e) {
|
||||
record_factory.SetErrorMessage(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
enum mgp_error Alloc_32(mgp_memory *memory, void *ptr) {
|
||||
const size_t mib_size_32 = 1 << 25;
|
||||
|
||||
return mgp_alloc(memory, mib_size_32, (void **)(&ptr));
|
||||
}
|
||||
|
||||
void Alloc_32_MiB(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
|
||||
mgp::MemoryDispatcherGuard guard{memory};
|
||||
const auto arguments = mgp::List(args);
|
||||
const auto record_factory = mgp::RecordFactory(result);
|
||||
|
||||
try {
|
||||
void *ptr{nullptr};
|
||||
|
||||
memgraph::utils::OnScopeExit<std::function<void(void)>> cleanup{[&ptr]() {
|
||||
if (nullptr == ptr) {
|
||||
return;
|
||||
}
|
||||
mgp_global_free(ptr);
|
||||
}};
|
||||
|
||||
const enum mgp_error alloc_err = Alloc_32(memory, ptr);
|
||||
auto new_record = record_factory.NewRecord();
|
||||
new_record.Insert("allocated", alloc_err != mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE);
|
||||
} catch (std::exception &e) {
|
||||
record_factory.SetErrorMessage(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) {
|
||||
try {
|
||||
mgp::MemoryDispatcherGuard mdg{memory};
|
||||
|
||||
AddProcedure(Alloc_256_MiB, std::string("alloc_256_mib").c_str(), mgp::ProcedureType::Read, {},
|
||||
{mgp::Return(std::string("allocated").c_str(), mgp::Type::Bool)}, module, memory);
|
||||
|
||||
AddProcedure(Alloc_32_MiB, std::string("alloc_32_mib").c_str(), mgp::ProcedureType::Read, {},
|
||||
{mgp::Return(std::string("allocated").c_str(), mgp::Type::Bool)}, module, memory);
|
||||
|
||||
} catch (const std::exception &e) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
extern "C" int mgp_shutdown_module() { return 0; }
|
@ -62,6 +62,20 @@ disk_450_MiB_limit_cluster: &disk_450_MiB_limit_cluster
|
||||
validation_queries: []
|
||||
|
||||
|
||||
args_global_limit_1024_MiB: &args_global_limit_1024_MiB
|
||||
- "--bolt-port"
|
||||
- *bolt_port
|
||||
- "--storage-gc-cycle-sec=180"
|
||||
- "--log-level=INFO"
|
||||
|
||||
in_memory_limited_global_limit_cluster: &in_memory_limited_global_limit_cluster
|
||||
cluster:
|
||||
main:
|
||||
args: *args_global_limit_1024_MiB
|
||||
log_file: "memory-e2e.log"
|
||||
setup_queries: []
|
||||
validation_queries: []
|
||||
|
||||
workloads:
|
||||
- name: "Memory control"
|
||||
binary: "tests/e2e/memory/memgraph__e2e__memory__control"
|
||||
@ -159,3 +173,15 @@ workloads:
|
||||
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_edge_create"
|
||||
args: ["--bolt-port", *bolt_port]
|
||||
<<: *disk_450_MiB_limit_cluster
|
||||
|
||||
- name: "Procedure memory control for single procedure"
|
||||
binary: "tests/e2e/memory/memgraph__e2e__procedure_memory_limit"
|
||||
proc: "tests/e2e/memory/procedures/"
|
||||
args: ["--bolt-port", *bolt_port]
|
||||
<<: *in_memory_limited_global_limit_cluster
|
||||
|
||||
- name: "Procedure memory control for multiple procedures"
|
||||
binary: "tests/e2e/memory/memgraph__e2e__procedure_memory_limit"
|
||||
proc: "tests/e2e/memory/procedures/"
|
||||
args: ["--bolt-port", *bolt_port]
|
||||
<<: *in_memory_limited_global_limit_cluster
|
||||
|
@ -103,3 +103,21 @@ Feature: Bfs
|
||||
Then the result should be:
|
||||
| s | r0 |
|
||||
| 1 | 1 |
|
||||
|
||||
Scenario: Test accessing a variable bound to a list within BFS function
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:Node {id: 0})-[:LINK {date: '2023-03'}]->(:Node {id: 1}),
|
||||
(:Node {id: 1})-[:LINK {date: '2023-04'}]->(:Node {id: 2}),
|
||||
(:Node {id: 2})-[:LINK {date: '2023-03'}]->(:Node {id: 3});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
WITH ['2023-03'] AS date
|
||||
MATCH p = (:Node)-[*BFS ..2 (r, n | r.date IN date)]->(:Node {id: 3})
|
||||
RETURN p
|
||||
"""
|
||||
Then the result should be:
|
||||
| p |
|
||||
| <(:Node {id: 2})-[:LINK {date: '2023-03'}]->(:Node {id: 3})> |
|
||||
|
@ -335,7 +335,7 @@ if __name__ == "__main__":
|
||||
continue
|
||||
else:
|
||||
cleaned.append(field)
|
||||
fields = cleaned
|
||||
fields = cleaned
|
||||
|
||||
if args.difference_threshold > 0.01:
|
||||
for field in fields:
|
||||
|
@ -296,6 +296,9 @@ target_link_libraries(${test_prefix}utils_java_string_formatter mg-utils)
|
||||
add_unit_test(utils_resource_lock.cpp)
|
||||
target_link_libraries(${test_prefix}utils_resource_lock mg-utils)
|
||||
|
||||
add_unit_test(lru_cache.cpp)
|
||||
target_link_libraries(${test_prefix}lru_cache mg-utils)
|
||||
|
||||
# Test mg-storage-v2
|
||||
add_unit_test(commit_log_v2.cpp)
|
||||
target_link_libraries(${test_prefix}commit_log_v2 gflags mg-utils mg-storage-v2)
|
||||
|
@ -37,6 +37,8 @@
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "storage/v2/storage_mode.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/lru_cache.hpp"
|
||||
#include "utils/synchronized.hpp"
|
||||
|
||||
namespace {
|
||||
|
||||
@ -662,7 +664,7 @@ TYPED_TEST(InterpreterTest, UniqueConstraintTest) {
|
||||
}
|
||||
|
||||
TYPED_TEST(InterpreterTest, ExplainQuery) {
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 0U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 0U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 0U);
|
||||
auto stream = this->Interpret("EXPLAIN MATCH (n) RETURN *;");
|
||||
ASSERT_EQ(stream.GetHeader().size(), 1U);
|
||||
@ -676,16 +678,16 @@ TYPED_TEST(InterpreterTest, ExplainQuery) {
|
||||
++expected_it;
|
||||
}
|
||||
// We should have a plan cache for MATCH ...
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
// We should have AST cache for EXPLAIN ... and for inner MATCH ...
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
this->Interpret("MATCH (n) RETURN *;");
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
}
|
||||
|
||||
TYPED_TEST(InterpreterTest, ExplainQueryMultiplePulls) {
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 0U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 0U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 0U);
|
||||
auto [stream, qid] = this->Prepare("EXPLAIN MATCH (n) RETURN *;");
|
||||
ASSERT_EQ(stream.GetHeader().size(), 1U);
|
||||
@ -709,16 +711,16 @@ TYPED_TEST(InterpreterTest, ExplainQueryMultiplePulls) {
|
||||
ASSERT_EQ(stream.GetResults()[2].size(), 1U);
|
||||
EXPECT_EQ(stream.GetResults()[2].front().ValueString(), *expected_it);
|
||||
// We should have a plan cache for MATCH ...
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
// We should have AST cache for EXPLAIN ... and for inner MATCH ...
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
this->Interpret("MATCH (n) RETURN *;");
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
}
|
||||
|
||||
TYPED_TEST(InterpreterTest, ExplainQueryInMulticommandTransaction) {
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 0U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 0U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 0U);
|
||||
this->Interpret("BEGIN");
|
||||
auto stream = this->Interpret("EXPLAIN MATCH (n) RETURN *;");
|
||||
@ -734,16 +736,16 @@ TYPED_TEST(InterpreterTest, ExplainQueryInMulticommandTransaction) {
|
||||
++expected_it;
|
||||
}
|
||||
// We should have a plan cache for MATCH ...
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
// We should have AST cache for EXPLAIN ... and for inner MATCH ...
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
this->Interpret("MATCH (n) RETURN *;");
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
}
|
||||
|
||||
TYPED_TEST(InterpreterTest, ExplainQueryWithParams) {
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 0U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 0U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 0U);
|
||||
auto stream =
|
||||
this->Interpret("EXPLAIN MATCH (n) WHERE n.id = $id RETURN *;", {{"id", memgraph::storage::PropertyValue(42)}});
|
||||
@ -758,16 +760,16 @@ TYPED_TEST(InterpreterTest, ExplainQueryWithParams) {
|
||||
++expected_it;
|
||||
}
|
||||
// We should have a plan cache for MATCH ...
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
// We should have AST cache for EXPLAIN ... and for inner MATCH ...
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
this->Interpret("MATCH (n) WHERE n.id = $id RETURN *;", {{"id", memgraph::storage::PropertyValue("something else")}});
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
}
|
||||
|
||||
TYPED_TEST(InterpreterTest, ProfileQuery) {
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 0U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 0U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 0U);
|
||||
auto stream = this->Interpret("PROFILE MATCH (n) RETURN *;");
|
||||
std::vector<std::string> expected_header{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"};
|
||||
@ -781,16 +783,16 @@ TYPED_TEST(InterpreterTest, ProfileQuery) {
|
||||
++expected_it;
|
||||
}
|
||||
// We should have a plan cache for MATCH ...
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
// We should have AST cache for PROFILE ... and for inner MATCH ...
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
this->Interpret("MATCH (n) RETURN *;");
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
}
|
||||
|
||||
TYPED_TEST(InterpreterTest, ProfileQueryMultiplePulls) {
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 0U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 0U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 0U);
|
||||
auto [stream, qid] = this->Prepare("PROFILE MATCH (n) RETURN *;");
|
||||
std::vector<std::string> expected_header{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"};
|
||||
@ -817,11 +819,11 @@ TYPED_TEST(InterpreterTest, ProfileQueryMultiplePulls) {
|
||||
ASSERT_EQ(stream.GetResults()[2][0].ValueString(), *expected_it);
|
||||
|
||||
// We should have a plan cache for MATCH ...
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
// We should have AST cache for PROFILE ... and for inner MATCH ...
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
this->Interpret("MATCH (n) RETURN *;");
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
}
|
||||
|
||||
@ -832,7 +834,7 @@ TYPED_TEST(InterpreterTest, ProfileQueryInMulticommandTransaction) {
|
||||
}
|
||||
|
||||
TYPED_TEST(InterpreterTest, ProfileQueryWithParams) {
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 0U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 0U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 0U);
|
||||
auto stream =
|
||||
this->Interpret("PROFILE MATCH (n) WHERE n.id = $id RETURN *;", {{"id", memgraph::storage::PropertyValue(42)}});
|
||||
@ -847,16 +849,16 @@ TYPED_TEST(InterpreterTest, ProfileQueryWithParams) {
|
||||
++expected_it;
|
||||
}
|
||||
// We should have a plan cache for MATCH ...
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
// We should have AST cache for PROFILE ... and for inner MATCH ...
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
this->Interpret("MATCH (n) WHERE n.id = $id RETURN *;", {{"id", memgraph::storage::PropertyValue("something else")}});
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
}
|
||||
|
||||
TYPED_TEST(InterpreterTest, ProfileQueryWithLiterals) {
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 0U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 0U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 0U);
|
||||
auto stream = this->Interpret("PROFILE UNWIND range(1, 1000) AS x CREATE (:Node {id: x});", {});
|
||||
std::vector<std::string> expected_header{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"};
|
||||
@ -870,11 +872,11 @@ TYPED_TEST(InterpreterTest, ProfileQueryWithLiterals) {
|
||||
++expected_it;
|
||||
}
|
||||
// We should have a plan cache for UNWIND ...
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
// We should have AST cache for PROFILE ... and for inner UNWIND ...
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
this->Interpret("UNWIND range(42, 4242) AS x CREATE (:Node {id: x});", {});
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 2U);
|
||||
}
|
||||
|
||||
@ -1100,7 +1102,7 @@ TYPED_TEST(InterpreterTest, CacheableQueries) {
|
||||
SCOPED_TRACE("Cacheable query");
|
||||
this->Interpret("RETURN 1");
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
}
|
||||
|
||||
{
|
||||
@ -1109,7 +1111,7 @@ TYPED_TEST(InterpreterTest, CacheableQueries) {
|
||||
// result signature could be changed
|
||||
this->Interpret("CALL mg.load_all()");
|
||||
EXPECT_EQ(this->interpreter_context.ast_cache.size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->size(), 1U);
|
||||
EXPECT_EQ(this->db->plan_cache()->WithLock([&](auto &cache) { return cache.size(); }), 1U);
|
||||
}
|
||||
}
|
||||
|
||||
|
111
tests/unit/lru_cache.cpp
Normal file
111
tests/unit/lru_cache.cpp
Normal file
@ -0,0 +1,111 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "utils/lru_cache.hpp"
|
||||
#include <optional>
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
TEST(LRUCacheTest, BasicTest) {
|
||||
memgraph::utils::LRUCache<int, int> cache(2);
|
||||
cache.put(1, 1);
|
||||
cache.put(2, 2);
|
||||
|
||||
std::optional<int> value;
|
||||
value = cache.get(1);
|
||||
EXPECT_TRUE(value.has_value());
|
||||
EXPECT_EQ(value.value(), 1);
|
||||
|
||||
cache.put(3, 3);
|
||||
|
||||
value = cache.get(2);
|
||||
EXPECT_FALSE(value.has_value());
|
||||
|
||||
cache.put(4, 4);
|
||||
|
||||
value = cache.get(1);
|
||||
EXPECT_FALSE(value.has_value());
|
||||
|
||||
value = cache.get(3);
|
||||
EXPECT_TRUE(value.has_value());
|
||||
EXPECT_EQ(value.value(), 3);
|
||||
|
||||
value = cache.get(4);
|
||||
EXPECT_TRUE(value.has_value());
|
||||
EXPECT_EQ(value.value(), 4);
|
||||
|
||||
EXPECT_EQ(cache.size(), 2);
|
||||
}
|
||||
|
||||
TEST(LRUCacheTest, DuplicatePutTest) {
|
||||
memgraph::utils::LRUCache<int, int> cache(2);
|
||||
cache.put(1, 1);
|
||||
cache.put(2, 2);
|
||||
cache.put(1, 10);
|
||||
|
||||
std::optional<int> value;
|
||||
value = cache.get(1);
|
||||
EXPECT_TRUE(value.has_value());
|
||||
EXPECT_EQ(value.value(), 10);
|
||||
|
||||
value = cache.get(2);
|
||||
EXPECT_TRUE(value.has_value());
|
||||
EXPECT_EQ(value.value(), 2);
|
||||
}
|
||||
|
||||
TEST(LRUCacheTest, ResizeTest) {
|
||||
memgraph::utils::LRUCache<int, int> cache(2);
|
||||
cache.put(1, 1);
|
||||
cache.put(2, 2);
|
||||
cache.put(3, 3);
|
||||
|
||||
std::optional<int> value;
|
||||
value = cache.get(1);
|
||||
EXPECT_FALSE(value.has_value());
|
||||
|
||||
value = cache.get(2);
|
||||
EXPECT_TRUE(value.has_value());
|
||||
EXPECT_EQ(value.value(), 2);
|
||||
|
||||
value = cache.get(3);
|
||||
EXPECT_TRUE(value.has_value());
|
||||
EXPECT_EQ(value.value(), 3);
|
||||
}
|
||||
|
||||
TEST(LRUCacheTest, EmptyCacheTest) {
|
||||
memgraph::utils::LRUCache<int, int> cache(2);
|
||||
|
||||
std::optional<int> value;
|
||||
value = cache.get(1);
|
||||
EXPECT_FALSE(value.has_value());
|
||||
|
||||
cache.put(1, 1);
|
||||
value = cache.get(1);
|
||||
EXPECT_TRUE(value.has_value());
|
||||
EXPECT_EQ(value.value(), 1);
|
||||
}
|
||||
|
||||
TEST(LRUCacheTest, LargeCacheTest) {
|
||||
const int CACHE_SIZE = 10000;
|
||||
memgraph::utils::LRUCache<int, int> cache(CACHE_SIZE);
|
||||
|
||||
std::optional<int> value;
|
||||
for (int i = 0; i < CACHE_SIZE; i++) {
|
||||
value = cache.get(i);
|
||||
EXPECT_FALSE(value.has_value());
|
||||
cache.put(i, i);
|
||||
}
|
||||
|
||||
for (int i = 0; i < CACHE_SIZE; i++) {
|
||||
value = cache.get(i);
|
||||
EXPECT_TRUE(value.has_value());
|
||||
EXPECT_EQ(value.value(), i);
|
||||
}
|
||||
}
|
@ -248,7 +248,7 @@ TYPED_TEST(ReadWriteTypeCheckTest, CallReadProcedureBeforeUpdate) {
|
||||
std::vector<Symbol> result_symbols{this->GetSymbol("name_alias"), this->GetSymbol("signature_alias")};
|
||||
|
||||
last_op = std::make_shared<plan::CallProcedure>(last_op, procedure_name, arguments, result_fields, result_symbols,
|
||||
nullptr, 0, false);
|
||||
nullptr, 0, false, 1);
|
||||
|
||||
this->CheckPlanType(last_op.get(), RWType::RW);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user