Add telemetry for streams and triggers ()

* Add event counters to streams and triggers
This commit is contained in:
Kostas Kyrimis 2021-07-06 18:44:47 +03:00 committed by Antonio Andelic
parent ad32db5168
commit 2e1a717dcb
4 changed files with 22 additions and 1 deletions

View File

@ -43,6 +43,9 @@ extern Event ReadWriteQuery;
extern const Event LabelIndexCreated;
extern const Event LabelPropertyIndexCreated;
extern const Event StreamsCreated;
extern const Event TriggersCreated;
} // namespace EventCounter
namespace query {
@ -476,6 +479,7 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
Callback callback;
switch (stream_query->action_) {
case StreamQuery::Action::CREATE_STREAM: {
EventCounter::IncrementCounter(EventCounter::StreamsCreated);
constexpr std::string_view kDefaultConsumerGroup = "mg_consumer";
std::string consumer_group{stream_query->consumer_group_.empty() ? kDefaultConsumerGroup
: stream_query->consumer_group_};
@ -1282,6 +1286,7 @@ PreparedQuery PrepareTriggerQuery(ParsedQuery parsed_query, const bool in_explic
auto callback = [trigger_query, interpreter_context, dba, &user_parameters] {
switch (trigger_query->action_) {
case TriggerQuery::Action::CREATE_TRIGGER:
EventCounter::IncrementCounter(EventCounter::TriggersCreated);
return CreateTrigger(trigger_query, user_parameters, interpreter_context, dba);
case TriggerQuery::Action::DROP_TRIGGER:
return DropTrigger(trigger_query, interpreter_context);

View File

@ -12,10 +12,15 @@
#include "query/procedure/mg_procedure_impl.hpp"
#include "query/procedure/module.hpp"
#include "query/typed_value.hpp"
#include "utils/event_counter.hpp"
#include "utils/memory.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/pmr/string.hpp"
namespace EventCounter {
extern const Event MessagesConsumed;
} // namespace EventCounter
namespace query {
using Consumer = integrations::kafka::Consumer;
@ -335,6 +340,7 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std
result = mgp_result{nullptr, memory_resource}](
const std::vector<integrations::kafka::Message> &messages) mutable {
auto accessor = interpreter_context->db->Access();
EventCounter::IncrementCounter(EventCounter::MessagesConsumed, messages.size());
CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name);
DiscardValueResultStream stream;

View File

@ -11,8 +11,13 @@
#include "query/serialization/property_value.hpp"
#include "query/typed_value.hpp"
#include "storage/v2/property_value.hpp"
#include "utils/event_counter.hpp"
#include "utils/memory.hpp"
namespace EventCounter {
extern const Event TriggersExecuted;
} // namespace EventCounter
namespace query {
namespace {
auto IdentifierString(const TriggerIdentifierTag tag) noexcept {
@ -228,6 +233,7 @@ void Trigger::Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution
;
cursor->Shutdown();
EventCounter::IncrementCounter(EventCounter::TriggersExecuted);
}
namespace {

View File

@ -41,7 +41,11 @@
\
M(FailedQuery, "Number of times executing a query failed.") \
M(LabelIndexCreated, "Number of times a label index was created.") \
M(LabelPropertyIndexCreated, "Number of times a label property index was created.")
M(LabelPropertyIndexCreated, "Number of times a label property index was created.") \
M(StreamsCreated, "Number of Streams created.") \
M(MessagesConsumed, "Number of consumed streamed messages.") \
M(TriggersCreated, "Number of Triggers created.") \
M(TriggersExecuted, "Number of Triggers executed.")
namespace EventCounter {