Add query module counters to telemetry (#112)

This commit is contained in:
antonio2368 2021-03-18 11:03:42 +01:00 committed by GitHub
parent 77a0d7b8fa
commit 593f7a3499
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 0 deletions

View File

@ -24,6 +24,7 @@
#include "py/py.hpp"
#include "query/exceptions.hpp"
#include "query/interpreter.hpp"
#include "query/plan/operator.hpp"
#include "query/procedure/module.hpp"
#include "query/procedure/py_module.hpp"
#include "requests/requests.hpp"
@ -999,6 +1000,8 @@ int main(int argc, char **argv) {
}
return ret;
});
telemetry->AddCollector("query_module_counters",
[]() -> nlohmann::json { return query::plan::CallProcedure::GetAndResetCounters(); });
}
// Handler for regular termination signals

View File

@ -3472,6 +3472,17 @@ std::vector<Symbol> CallProcedure::ModifiedSymbols(const SymbolTable &table) con
return symbols;
}
void CallProcedure::IncrementCounter(const std::string &procedure_name) {
procedure_counters_.WithLock([&](auto &counters) { ++counters[procedure_name]; });
}
std::unordered_map<std::string, int64_t> CallProcedure::GetAndResetCounters() {
auto counters = procedure_counters_.Lock();
auto ret = std::move(*counters);
counters->clear();
return ret;
}
namespace {
std::optional<size_t> EvalMemoryLimit(ExpressionEvaluator *eval, Expression *memory_limit, size_t memory_scale) {
@ -3648,6 +3659,7 @@ class CallProcedureCursor : public Cursor {
frame[self_->result_symbols_[i]] = result_it->second;
}
++result_row_it_;
return true;
}
@ -3662,6 +3674,7 @@ class CallProcedureCursor : public Cursor {
UniqueCursorPtr CallProcedure::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::CallProcedureOperator);
CallProcedure::IncrementCounter(procedure_name_);
return MakeUniqueCursorPtr<CallProcedureCursor>(mem, this, mem);
}

View File

@ -2145,6 +2145,13 @@ at once. Instead, each call of the callback should return a single row of the ta
void set_input(std::shared_ptr<LogicalOperator> input) override {
input_ = input;
}
static void IncrementCounter(const std::string &procedure_name);
static std::unordered_map<std::string, int64_t> GetAndResetCounters();
cpp<#)
(:private
#>cpp
inline static utils::Synchronized<std::unordered_map<std::string, int64_t>, utils::SpinLock> procedure_counters_;
cpp<#)
(:serialize (:slk))
(:clone))