diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 9a902db4f..4f2bbcb61 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -24,6 +24,7 @@ #include "py/py.hpp" #include "query/exceptions.hpp" #include "query/interpreter.hpp" +#include "query/plan/operator.hpp" #include "query/procedure/module.hpp" #include "query/procedure/py_module.hpp" #include "requests/requests.hpp" @@ -999,6 +1000,8 @@ int main(int argc, char **argv) { } return ret; }); + telemetry->AddCollector("query_module_counters", + []() -> nlohmann::json { return query::plan::CallProcedure::GetAndResetCounters(); }); } // Handler for regular termination signals diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 26ffe1295..6b3ad4b47 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -3472,6 +3472,17 @@ std::vector CallProcedure::ModifiedSymbols(const SymbolTable &table) con return symbols; } +void CallProcedure::IncrementCounter(const std::string &procedure_name) { + procedure_counters_.WithLock([&](auto &counters) { ++counters[procedure_name]; }); +} + +std::unordered_map CallProcedure::GetAndResetCounters() { + auto counters = procedure_counters_.Lock(); + auto ret = std::move(*counters); + counters->clear(); + return ret; +} + namespace { std::optional EvalMemoryLimit(ExpressionEvaluator *eval, Expression *memory_limit, size_t memory_scale) { @@ -3648,6 +3659,7 @@ class CallProcedureCursor : public Cursor { frame[self_->result_symbols_[i]] = result_it->second; } ++result_row_it_; + return true; } @@ -3662,6 +3674,7 @@ class CallProcedureCursor : public Cursor { UniqueCursorPtr CallProcedure::MakeCursor(utils::MemoryResource *mem) const { EventCounter::IncrementCounter(EventCounter::CallProcedureOperator); + CallProcedure::IncrementCounter(procedure_name_); return MakeUniqueCursorPtr(mem, this, mem); } diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp index b3f33f0c1..929cf4a97 100644 --- a/src/query/plan/operator.lcp +++ b/src/query/plan/operator.lcp @@ -2145,6 +2145,13 @@ at once. Instead, each call of the callback should return a single row of the ta void set_input(std::shared_ptr input) override { input_ = input; } + + static void IncrementCounter(const std::string &procedure_name); + static std::unordered_map GetAndResetCounters(); + cpp<#) + (:private + #>cpp + inline static utils::Synchronized, utils::SpinLock> procedure_counters_; cpp<#) (:serialize (:slk)) (:clone))