Add stats and notifications in results metadata (#285)

This commit is contained in:
Jure Bajic 2021-11-15 13:51:13 +01:00 committed by GitHub
parent 95dd3481c0
commit 16709dff6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 814 additions and 77 deletions

View File

@ -20,6 +20,7 @@ set(mg_query_sources
interpret/awesome_memgraph_functions.cpp
interpret/eval.cpp
interpreter.cpp
metadata.cpp
plan/operator.cpp
plan/preprocess.cpp
plan/pretty_print.cpp

View File

@ -15,6 +15,7 @@
#include "query/common.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/metadata.hpp"
#include "query/parameters.hpp"
#include "query/plan/profile.hpp"
#include "query/trigger.hpp"
@ -68,6 +69,7 @@ struct ExecutionContext {
std::chrono::duration<double> profile_execution_time;
plan::ProfilingStats stats;
plan::ProfilingStats *stats_root{nullptr};
ExecutionStats execution_stats;
TriggerContextCollector *trigger_context_collector{nullptr};
utils::AsyncTimer timer;
};

View File

@ -10,9 +10,14 @@
// licenses/APL.txt.
#include "query/interpreter.hpp"
#include <fmt/core.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <limits>
#include <optional>
@ -31,6 +36,7 @@
#include "query/frontend/semantic/required_privileges.hpp"
#include "query/frontend/semantic/symbol_generator.hpp"
#include "query/interpret/eval.hpp"
#include "query/metadata.hpp"
#include "query/plan/planner.hpp"
#include "query/plan/profile.hpp"
#include "query/plan/vertex_count_cache.hpp"
@ -405,7 +411,8 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa
}
Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &parameters,
InterpreterContext *interpreter_context, DbAccessor *db_accessor) {
InterpreterContext *interpreter_context, DbAccessor *db_accessor,
std::vector<Notification> *notifications) {
Frame frame(0);
SymbolTable symbol_table;
EvaluationContext evaluation_context;
@ -423,11 +430,19 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
if (port.IsInt()) {
maybe_port = port.ValueInt();
}
if (maybe_port == 7687 && repl_query->role_ == ReplicationQuery::ReplicationRole::REPLICA) {
notifications->emplace_back(SeverityLevel::WARNING, NotificationCode::REPLICA_PORT_WARNING,
"Be careful the replication port must be different from the memgraph port!");
}
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, role = repl_query->role_,
maybe_port]() mutable {
handler.SetReplicationRole(role, maybe_port);
return std::vector<std::vector<TypedValue>>();
};
notifications->emplace_back(
SeverityLevel::INFO, NotificationCode::SET_REPLICA,
fmt::format("Replica role set to {}.",
repl_query->role_ == ReplicationQuery::ReplicationRole::MAIN ? "MAIN" : "REPLICA"));
return callback;
}
case ReplicationQuery::Action::SHOW_REPLICATION_ROLE: {
@ -461,6 +476,8 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout);
return std::vector<std::vector<TypedValue>>();
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::REGISTER_REPLICA,
fmt::format("Replica {} is registered.", repl_query->replica_name_));
return callback;
}
case ReplicationQuery::Action::DROP_REPLICA: {
@ -469,6 +486,8 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
handler.DropReplica(name);
return std::vector<std::vector<TypedValue>>();
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::DROP_REPLICA,
fmt::format("Replica {} is dropped.", repl_query->replica_name_));
return callback;
}
case ReplicationQuery::Action::SHOW_REPLICAS: {
@ -513,7 +532,7 @@ std::optional<std::string> StringPointerToOptional(const std::string *str) {
Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &parameters,
InterpreterContext *interpreter_context, DbAccessor *db_accessor,
const std::string *username) {
const std::string *username, std::vector<Notification> *notifications) {
Frame frame(0);
SymbolTable symbol_table;
EvaluationContext evaluation_context;
@ -553,6 +572,8 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
.bootstrap_servers = std::move(bootstrap)});
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CREATE_STREAM,
fmt::format("Created stream {}.", stream_query->stream_name_));
return callback;
}
case StreamQuery::Action::START_STREAM: {
@ -560,6 +581,8 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
interpreter_context->streams.Start(stream_name);
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::START_STREAM,
fmt::format("Started stream {}.", stream_query->stream_name_));
return callback;
}
case StreamQuery::Action::START_ALL_STREAMS: {
@ -567,6 +590,7 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
interpreter_context->streams.StartAll();
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::START_ALL_STREAMS, "Started all streams.");
return callback;
}
case StreamQuery::Action::STOP_STREAM: {
@ -574,6 +598,8 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
interpreter_context->streams.Stop(stream_name);
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::STOP_STREAM,
fmt::format("Stopped stream {}.", stream_query->stream_name_));
return callback;
}
case StreamQuery::Action::STOP_ALL_STREAMS: {
@ -581,6 +607,7 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
interpreter_context->streams.StopAll();
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::STOP_ALL_STREAMS, "Stopped all streams.");
return callback;
}
case StreamQuery::Action::DROP_STREAM: {
@ -588,6 +615,8 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
interpreter_context->streams.Drop(stream_name);
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::DROP_STREAM,
fmt::format("Dropped stream {}.", stream_query->stream_name_));
return callback;
}
case StreamQuery::Action::SHOW_STREAMS: {
@ -656,6 +685,8 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator)]() mutable {
return interpreter_context->streams.Check(stream_name, timeout, batch_limit);
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CHECK_STREAM,
fmt::format("Checked stream {}.", stream_query->stream_name_));
return callback;
}
}
@ -886,8 +917,19 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *strea
if (has_unsent_results_) {
return std::nullopt;
}
summary->insert_or_assign("plan_execution_time", execution_time_.count());
// We are finished with pulling all the data, therefore we can send any
// metadata about the results i.e. notifications and statistics
const bool is_any_counter_set =
std::any_of(ctx_.execution_stats.counters.begin(), ctx_.execution_stats.counters.end(),
[](const auto &counter) { return counter > 0; });
if (is_any_counter_set) {
std::map<std::string, TypedValue> stats;
for (size_t i = 0; i < ctx_.execution_stats.counters.size(); ++i) {
stats.emplace(ExecutionStatsKeyToString(ExecutionStats::Key(i)), ctx_.execution_stats.counters[i]);
}
summary->insert_or_assign("stats", std::move(stats));
}
cursor_->Shutdown();
ctx_.profile_execution_time = execution_time_;
return GetStatsWithTotalTime(ctx_);
@ -971,7 +1013,7 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
InterpreterContext *interpreter_context, DbAccessor *dba,
utils::MemoryResource *execution_memory,
utils::MemoryResource *execution_memory, std::vector<Notification> *notifications,
TriggerContextCollector *trigger_context_collector = nullptr) {
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query);
@ -986,6 +1028,15 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
spdlog::info("Running query with memory limit of {}", utils::GetReadableSize(*memory_limit));
}
if (const auto &clauses = cypher_query->single_query_->clauses_; std::any_of(
clauses.begin(), clauses.end(), [](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) {
notifications->emplace_back(
SeverityLevel::INFO, NotificationCode::LOAD_CSV_TIP,
"It's important to note that the parser parses the values as strings. It's up to the user to "
"convert the parsed row values to the appropriate type. This can be done using the built-in "
"conversion functions such as ToInteger, ToFloat, ToBoolean etc.");
}
auto plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query,
parsed_query.parameters,
parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, dba);
@ -1006,7 +1057,6 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
header.push_back(
utils::FindOr(parsed_query.stripped_query.named_expressions(), symbol.token_position(), symbol.name()).first);
}
auto pull_plan = std::make_shared<PullPlan>(plan, parsed_query.parameters, false, dba, interpreter_context,
execution_memory, trigger_context_collector, memory_limit);
return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges),
@ -1164,14 +1214,13 @@ PreparedQuery PrepareDumpQuery(ParsedQuery parsed_query, std::map<std::string, T
}
PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
utils::MemoryResource *execution_memory) {
std::vector<Notification> *notifications, InterpreterContext *interpreter_context) {
if (in_explicit_transaction) {
throw IndexInMulticommandTxException();
}
auto *index_query = utils::Downcast<IndexQuery>(parsed_query.query);
std::function<void()> handler;
std::function<void(Notification &)> handler;
// Creating an index influences computed plan costs.
auto invalidate_plan_cache = [plan_cache = &interpreter_context->plan_cache] {
@ -1182,26 +1231,45 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
};
auto label = interpreter_context->db->NameToLabel(index_query->label_.name);
std::vector<storage::PropertyId> properties;
std::vector<std::string> properties_string;
properties.reserve(index_query->properties_.size());
properties_string.reserve(index_query->properties_.size());
for (const auto &prop : index_query->properties_) {
properties.push_back(interpreter_context->db->NameToProperty(prop.name));
properties_string.push_back(prop.name);
}
auto properties_stringified = utils::Join(properties_string, ", ");
if (properties.size() > 1) {
throw utils::NotYetImplemented("index on multiple properties");
}
Notification index_notification(SeverityLevel::INFO);
switch (index_query->action_) {
case IndexQuery::Action::CREATE: {
handler = [interpreter_context, label, properties = std::move(properties),
invalidate_plan_cache = std::move(invalidate_plan_cache)] {
index_notification.code = NotificationCode::CREATE_INDEX;
index_notification.title =
fmt::format("Created index on label {} on properties {}.", index_query->label_.name, properties_stringified);
handler = [interpreter_context, label, properties_stringified = std::move(properties_stringified),
label_name = index_query->label_.name, properties = std::move(properties),
invalidate_plan_cache = std::move(invalidate_plan_cache)](Notification &index_notification) {
if (properties.empty()) {
interpreter_context->db->CreateIndex(label);
if (!interpreter_context->db->CreateIndex(label)) {
index_notification.code = NotificationCode::EXISTANT_INDEX;
index_notification.title =
fmt::format("Index on label {} on properties {} already exists.", label_name, properties_stringified);
}
EventCounter::IncrementCounter(EventCounter::LabelIndexCreated);
} else {
MG_ASSERT(properties.size() == 1U);
interpreter_context->db->CreateIndex(label, properties[0]);
if (!interpreter_context->db->CreateIndex(label, properties[0])) {
index_notification.code = NotificationCode::EXISTANT_INDEX;
index_notification.title =
fmt::format("Index on label {} on properties {} already exists.", label_name, properties_stringified);
}
EventCounter::IncrementCounter(EventCounter::LabelPropertyIndexCreated);
}
invalidate_plan_cache();
@ -1209,13 +1277,25 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
break;
}
case IndexQuery::Action::DROP: {
handler = [interpreter_context, label, properties = std::move(properties),
invalidate_plan_cache = std::move(invalidate_plan_cache)] {
index_notification.code = NotificationCode::DROP_INDEX;
index_notification.title = fmt::format("Dropped index on label {} on properties {}.", index_query->label_.name,
utils::Join(properties_string, ", "));
handler = [interpreter_context, label, properties_stringified = std::move(properties_stringified),
label_name = index_query->label_.name, properties = std::move(properties),
invalidate_plan_cache = std::move(invalidate_plan_cache)](Notification &index_notification) {
if (properties.empty()) {
interpreter_context->db->DropIndex(label);
if (!interpreter_context->db->DropIndex(label)) {
index_notification.code = NotificationCode::NONEXISTANT_INDEX;
index_notification.title =
fmt::format("Index on label {} on properties {} doesn't exist.", label_name, properties_stringified);
}
} else {
MG_ASSERT(properties.size() == 1U);
interpreter_context->db->DropIndex(label, properties[0]);
if (!interpreter_context->db->DropIndex(label, properties[0])) {
index_notification.code = NotificationCode::NONEXISTANT_INDEX;
index_notification.title =
fmt::format("Index on label {} on properties {} doesn't exist.", label_name, properties_stringified);
}
}
invalidate_plan_cache();
};
@ -1223,13 +1303,16 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
}
}
return PreparedQuery{{},
std::move(parsed_query.required_privileges),
[handler = std::move(handler)](AnyStream *stream, std::optional<int>) {
handler();
return QueryHandlerResult::NOTHING;
},
RWType::W};
return PreparedQuery{
{},
std::move(parsed_query.required_privileges),
[handler = std::move(handler), notifications, index_notification = std::move(index_notification)](
AnyStream * /*stream*/, std::optional<int> /*unused*/) mutable {
handler(index_notification);
notifications->push_back(index_notification);
return QueryHandlerResult::NOTHING;
},
RWType::W};
}
PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
@ -1269,13 +1352,15 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa
}
PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
InterpreterContext *interpreter_context, DbAccessor *dba) {
std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
DbAccessor *dba) {
if (in_explicit_transaction) {
throw ReplicationModificationInMulticommandTxException();
}
auto *replication_query = utils::Downcast<ReplicationQuery>(parsed_query.query);
auto callback = HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, dba);
auto callback =
HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, dba, notifications);
return PreparedQuery{callback.header, std::move(parsed_query.required_privileges),
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
@ -1424,8 +1509,8 @@ Callback ShowTriggers(InterpreterContext *interpreter_context) {
}
PreparedQuery PrepareTriggerQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
InterpreterContext *interpreter_context, DbAccessor *dba,
const std::map<std::string, storage::PropertyValue> &user_parameters,
std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
DbAccessor *dba, const std::map<std::string, storage::PropertyValue> &user_parameters,
const std::string *username) {
if (in_explicit_transaction) {
throw TriggerModificationInMulticommandTxException();
@ -1434,27 +1519,36 @@ PreparedQuery PrepareTriggerQuery(ParsedQuery parsed_query, const bool in_explic
auto *trigger_query = utils::Downcast<TriggerQuery>(parsed_query.query);
MG_ASSERT(trigger_query);
auto callback = [trigger_query, interpreter_context, dba, &user_parameters,
owner = StringPointerToOptional(username)]() mutable {
std::optional<Notification> trigger_notification;
auto callback = std::invoke([trigger_query, interpreter_context, dba, &user_parameters,
owner = StringPointerToOptional(username), &trigger_notification]() mutable {
switch (trigger_query->action_) {
case TriggerQuery::Action::CREATE_TRIGGER:
trigger_notification.emplace(SeverityLevel::INFO, NotificationCode::CREATE_TRIGGER,
fmt::format("Created trigger {}.", trigger_query->trigger_name_));
EventCounter::IncrementCounter(EventCounter::TriggersCreated);
return CreateTrigger(trigger_query, user_parameters, interpreter_context, dba, std::move(owner));
case TriggerQuery::Action::DROP_TRIGGER:
trigger_notification.emplace(SeverityLevel::INFO, NotificationCode::DROP_TRIGGER,
fmt::format("Dropped trigger {}.", trigger_query->trigger_name_));
return DropTrigger(trigger_query, interpreter_context);
case TriggerQuery::Action::SHOW_TRIGGERS:
return ShowTriggers(interpreter_context);
}
}();
});
return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges),
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr},
trigger_notification = std::move(trigger_notification), notifications](
AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
if (UNLIKELY(!pull_plan)) {
pull_plan = std::make_shared<PullPlanVector>(callback_fn());
}
if (pull_plan->Pull(stream, n)) {
if (trigger_notification) {
notifications->push_back(std::move(*trigger_notification));
}
return QueryHandlerResult::COMMIT;
}
return std::nullopt;
@ -1465,8 +1559,9 @@ PreparedQuery PrepareTriggerQuery(ParsedQuery parsed_query, const bool in_explic
}
PreparedQuery PrepareStreamQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
InterpreterContext *interpreter_context, DbAccessor *dba,
const std::map<std::string, storage::PropertyValue> &user_parameters,
std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
DbAccessor *dba,
const std::map<std::string, storage::PropertyValue> & /*user_parameters*/,
const std::string *username) {
if (in_explicit_transaction) {
throw StreamQueryInMulticommandTxException();
@ -1474,7 +1569,8 @@ PreparedQuery PrepareStreamQuery(ParsedQuery parsed_query, const bool in_explici
auto *stream_query = utils::Downcast<StreamQuery>(parsed_query.query);
MG_ASSERT(stream_query);
auto callback = HandleStreamQuery(stream_query, parsed_query.parameters, interpreter_context, dba, username);
auto callback =
HandleStreamQuery(stream_query, parsed_query.parameters, interpreter_context, dba, username, notifications);
return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges),
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
@ -1674,24 +1770,31 @@ PreparedQuery PrepareInfoQuery(ParsedQuery parsed_query, bool in_explicit_transa
}
PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
std::map<std::string, TypedValue> *summary,
InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory) {
std::vector<Notification> *notifications,
InterpreterContext *interpreter_context) {
if (in_explicit_transaction) {
throw ConstraintInMulticommandTxException();
}
auto *constraint_query = utils::Downcast<ConstraintQuery>(parsed_query.query);
std::function<void()> handler;
std::function<void(Notification &)> handler;
auto label = interpreter_context->db->NameToLabel(constraint_query->constraint_.label.name);
std::vector<storage::PropertyId> properties;
std::vector<std::string> properties_string;
properties.reserve(constraint_query->constraint_.properties.size());
properties_string.reserve(constraint_query->constraint_.properties.size());
for (const auto &prop : constraint_query->constraint_.properties) {
properties.push_back(interpreter_context->db->NameToProperty(prop.name));
properties_string.push_back(prop.name);
}
auto properties_stringified = utils::Join(properties_string, ", ");
Notification constraint_notification(SeverityLevel::INFO);
switch (constraint_query->action_type_) {
case ConstraintQuery::ActionType::CREATE: {
constraint_notification.code = NotificationCode::CREATE_CONSTRAINT;
switch (constraint_query->constraint_.type) {
case Constraint::Type::NODE_KEY:
throw utils::NotYetImplemented("Node key constraints");
@ -1699,7 +1802,11 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_
if (properties.empty() || properties.size() > 1) {
throw SyntaxException("Exactly one property must be used for existence constraints.");
}
handler = [interpreter_context, label, properties = std::move(properties)] {
constraint_notification.title = fmt::format("Created EXISTS constraint on label {} on properties {}.",
constraint_query->constraint_.label.name, properties_stringified);
handler = [interpreter_context, label, label_name = constraint_query->constraint_.label.name,
properties_stringified = std::move(properties_stringified),
properties = std::move(properties)](Notification &constraint_notification) {
auto res = interpreter_context->db->CreateExistenceConstraint(label, properties[0]);
if (res.HasError()) {
auto violation = res.GetError();
@ -1711,6 +1818,11 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_
"existing node violates it.",
label_name, property_name);
}
if (res.HasValue() && !res.GetValue()) {
constraint_notification.code = NotificationCode::EXISTANT_CONSTRAINT;
constraint_notification.title = fmt::format(
"Constraint EXISTS on label {} on properties {} already exists.", label_name, properties_stringified);
}
};
break;
case Constraint::Type::UNIQUE:
@ -1721,7 +1833,12 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_
if (property_set.size() != properties.size()) {
throw SyntaxException("The given set of properties contains duplicates.");
}
handler = [interpreter_context, label, property_set = std::move(property_set)] {
constraint_notification.title =
fmt::format("Created UNIQUE constraint on label {} on properties {}.",
constraint_query->constraint_.label.name, utils::Join(properties_string, ", "));
handler = [interpreter_context, label, label_name = constraint_query->constraint_.label.name,
properties_stringified = std::move(properties_stringified),
property_set = std::move(property_set)](Notification &constraint_notification) {
auto res = interpreter_context->db->CreateUniqueConstraint(label, property_set);
if (res.HasError()) {
auto violation = res.GetError();
@ -1735,29 +1852,33 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_
"Unable to create unique constraint :{}({}), because an "
"existing node violates it.",
label_name, property_names_stream.str());
} else {
switch (res.GetValue()) {
case storage::UniqueConstraints::CreationStatus::EMPTY_PROPERTIES:
throw SyntaxException(
"At least one property must be used for unique "
"constraints.");
break;
case storage::UniqueConstraints::CreationStatus::PROPERTIES_SIZE_LIMIT_EXCEEDED:
throw SyntaxException(
"Too many properties specified. Limit of {} properties "
"for unique constraints is exceeded.",
storage::kUniqueConstraintsMaxProperties);
break;
case storage::UniqueConstraints::CreationStatus::ALREADY_EXISTS:
case storage::UniqueConstraints::CreationStatus::SUCCESS:
break;
}
}
switch (res.GetValue()) {
case storage::UniqueConstraints::CreationStatus::EMPTY_PROPERTIES:
throw SyntaxException(
"At least one property must be used for unique "
"constraints.");
case storage::UniqueConstraints::CreationStatus::PROPERTIES_SIZE_LIMIT_EXCEEDED:
throw SyntaxException(
"Too many properties specified. Limit of {} properties "
"for unique constraints is exceeded.",
storage::kUniqueConstraintsMaxProperties);
case storage::UniqueConstraints::CreationStatus::ALREADY_EXISTS:
constraint_notification.code = NotificationCode::EXISTANT_CONSTRAINT;
constraint_notification.title =
fmt::format("Constraint UNIQUE on label {} on properties {} already exists.", label_name,
properties_stringified);
break;
case storage::UniqueConstraints::CreationStatus::SUCCESS:
break;
}
};
break;
}
} break;
case ConstraintQuery::ActionType::DROP: {
constraint_notification.code = NotificationCode::DROP_CONSTRAINT;
switch (constraint_query->constraint_.type) {
case Constraint::Type::NODE_KEY:
throw utils::NotYetImplemented("Node key constraints");
@ -1765,8 +1886,17 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_
if (properties.empty() || properties.size() > 1) {
throw SyntaxException("Exactly one property must be used for existence constraints.");
}
handler = [interpreter_context, label, properties = std::move(properties)] {
interpreter_context->db->DropExistenceConstraint(label, properties[0]);
constraint_notification.title =
fmt::format("Dropped EXISTS constraint on label {} on properties {}.",
constraint_query->constraint_.label.name, utils::Join(properties_string, ", "));
handler = [interpreter_context, label, label_name = constraint_query->constraint_.label.name,
properties_stringified = std::move(properties_stringified),
properties = std::move(properties)](Notification &constraint_notification) {
if (!interpreter_context->db->DropExistenceConstraint(label, properties[0])) {
constraint_notification.code = NotificationCode::NONEXISTANT_CONSTRAINT;
constraint_notification.title = fmt::format(
"Constraint EXISTS on label {} on properties {} doesn't exist.", label_name, properties_stringified);
}
return std::vector<std::vector<TypedValue>>();
};
break;
@ -1778,7 +1908,12 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_
if (property_set.size() != properties.size()) {
throw SyntaxException("The given set of properties contains duplicates.");
}
handler = [interpreter_context, label, property_set = std::move(property_set)] {
constraint_notification.title =
fmt::format("Dropped UNIQUE constraint on label {} on properties {}.",
constraint_query->constraint_.label.name, utils::Join(properties_string, ", "));
handler = [interpreter_context, label, label_name = constraint_query->constraint_.label.name,
properties_stringified = std::move(properties_stringified),
property_set = std::move(property_set)](Notification &constraint_notification) {
auto res = interpreter_context->db->DropUniqueConstraint(label, property_set);
switch (res) {
case storage::UniqueConstraints::DeletionStatus::EMPTY_PROPERTIES:
@ -1793,6 +1928,11 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_
storage::kUniqueConstraintsMaxProperties);
break;
case storage::UniqueConstraints::DeletionStatus::NOT_FOUND:
constraint_notification.code = NotificationCode::NONEXISTANT_CONSTRAINT;
constraint_notification.title =
fmt::format("Constraint UNIQUE on label {} on properties {} doesn't exist.", label_name,
properties_stringified);
break;
case storage::UniqueConstraints::DeletionStatus::SUCCESS:
break;
}
@ -1804,8 +1944,10 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_
return PreparedQuery{{},
std::move(parsed_query.required_privileges),
[handler = std::move(handler)](AnyStream *stream, std::optional<int> n) {
handler();
[handler = std::move(handler), constraint_notification = std::move(constraint_notification),
notifications](AnyStream * /*stream*/, std::optional<int> /*n*/) mutable {
handler(constraint_notification);
notifications->push_back(constraint_notification);
return QueryHandlerResult::COMMIT;
},
RWType::NONE};
@ -1891,6 +2033,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
if (utils::Downcast<CypherQuery>(parsed_query.query)) {
prepared_query = PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
&*execution_db_accessor_, &query_execution->execution_memory,
&query_execution->notifications,
trigger_context_collector_ ? &*trigger_context_collector_ : nullptr);
} else if (utils::Downcast<ExplainQuery>(parsed_query.query)) {
prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
@ -1903,8 +2046,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
prepared_query = PrepareDumpQuery(std::move(parsed_query), &query_execution->summary, &*execution_db_accessor_,
&query_execution->execution_memory);
} else if (utils::Downcast<IndexQuery>(parsed_query.query)) {
prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
interpreter_context_, &query_execution->execution_memory_with_exception);
prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_,
&query_execution->notifications, interpreter_context_);
} else if (utils::Downcast<AuthQuery>(parsed_query.query)) {
prepared_query = PrepareAuthQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
interpreter_context_, &*execution_db_accessor_,
@ -1914,23 +2057,25 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
interpreter_context_, interpreter_context_->db,
&query_execution->execution_memory_with_exception);
} else if (utils::Downcast<ConstraintQuery>(parsed_query.query)) {
prepared_query =
PrepareConstraintQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
interpreter_context_, &query_execution->execution_memory_with_exception);
prepared_query = PrepareConstraintQuery(std::move(parsed_query), in_explicit_transaction_,
&query_execution->notifications, interpreter_context_);
} else if (utils::Downcast<ReplicationQuery>(parsed_query.query)) {
prepared_query = PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_,
&*execution_db_accessor_);
prepared_query =
PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications,
interpreter_context_, &*execution_db_accessor_);
} else if (utils::Downcast<LockPathQuery>(parsed_query.query)) {
prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_,
&*execution_db_accessor_);
} else if (utils::Downcast<FreeMemoryQuery>(parsed_query.query)) {
prepared_query = PrepareFreeMemoryQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_);
} else if (utils::Downcast<TriggerQuery>(parsed_query.query)) {
prepared_query = PrepareTriggerQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_,
&*execution_db_accessor_, params, username);
prepared_query =
PrepareTriggerQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications,
interpreter_context_, &*execution_db_accessor_, params, username);
} else if (utils::Downcast<StreamQuery>(parsed_query.query)) {
prepared_query = PrepareStreamQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_,
&*execution_db_accessor_, params, username);
prepared_query =
PrepareStreamQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications,
interpreter_context_, &*execution_db_accessor_, params, username);
} else if (utils::Downcast<IsolationLevelQuery>(parsed_query.query)) {
prepared_query =
PrepareIsolationLevelQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_, this);

View File

@ -23,6 +23,7 @@
#include "query/frontend/ast/cypher_main_visitor.hpp"
#include "query/frontend/stripped.hpp"
#include "query/interpret/frame.hpp"
#include "query/metadata.hpp"
#include "query/plan/operator.hpp"
#include "query/plan/read_write_type_checker.hpp"
#include "query/stream.hpp"
@ -285,6 +286,7 @@ class Interpreter final {
utils::ResourceWithOutOfMemoryException execution_memory_with_exception{&execution_memory};
std::map<std::string, TypedValue> summary;
std::vector<Notification> notifications;
explicit QueryExecution() = default;
QueryExecution(const QueryExecution &) = delete;
@ -377,6 +379,14 @@ std::map<std::string, TypedValue> Interpreter::Pull(TStream *result_stream, std:
if (maybe_res) {
// Save its summary
maybe_summary.emplace(std::move(query_execution->summary));
if (!query_execution->notifications.empty()) {
std::vector<TypedValue> notifications;
notifications.reserve(query_execution->notifications.size());
for (const auto &notification : query_execution->notifications) {
notifications.emplace_back(notification.ConvertToMap());
}
maybe_summary->insert_or_assign("notifications", std::move(notifications));
}
if (!in_explicit_transaction_) {
switch (*maybe_res) {
case QueryHandlerResult::COMMIT:

117
src/query/metadata.cpp Normal file
View File

@ -0,0 +1,117 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "query/metadata.hpp"
#include <algorithm>
#include <compare>
#include <string>
#include <string_view>
namespace query {
namespace {
using namespace std::literals;
constexpr std::string_view GetSeverityLevelString(const SeverityLevel level) {
switch (level) {
case SeverityLevel::INFO:
return "INFO"sv;
case SeverityLevel::WARNING:
return "WARNING"sv;
}
}
constexpr std::string_view GetCodeString(const NotificationCode code) {
switch (code) {
case NotificationCode::CREATE_CONSTRAINT:
return "CreateConstraint"sv;
case NotificationCode::CREATE_INDEX:
return "CreateIndex"sv;
case NotificationCode::CREATE_STREAM:
return "CreateStream"sv;
case NotificationCode::CHECK_STREAM:
return "CheckStream"sv;
case NotificationCode::CREATE_TRIGGER:
return "CreateTrigger"sv;
case NotificationCode::DROP_CONSTRAINT:
return "DropConstraint"sv;
case NotificationCode::DROP_REPLICA:
return "DropReplica"sv;
case NotificationCode::DROP_INDEX:
return "DropIndex"sv;
case NotificationCode::DROP_STREAM:
return "DropStream"sv;
case NotificationCode::DROP_TRIGGER:
return "DropTrigger"sv;
case NotificationCode::EXISTANT_CONSTRAINT:
return "ConstraintAlreadyExists"sv;
case NotificationCode::EXISTANT_INDEX:
return "IndexAlreadyExists"sv;
case NotificationCode::LOAD_CSV_TIP:
return "LoadCSVTip"sv;
case NotificationCode::NONEXISTANT_INDEX:
return "IndexDoesNotExist"sv;
case NotificationCode::NONEXISTANT_CONSTRAINT:
return "ConstraintDoesNotExist"sv;
case NotificationCode::REGISTER_REPLICA:
return "RegisterReplica"sv;
case NotificationCode::REPLICA_PORT_WARNING:
return "ReplicaPortWarning"sv;
case NotificationCode::SET_REPLICA:
return "SetReplica"sv;
case NotificationCode::START_STREAM:
return "StartStream"sv;
case NotificationCode::START_ALL_STREAMS:
return "StartAllStreams"sv;
case NotificationCode::STOP_STREAM:
return "StopStream"sv;
case NotificationCode::STOP_ALL_STREAMS:
return "StopAllStreams"sv;
}
}
} // namespace
Notification::Notification(SeverityLevel level) : level{level} {};
Notification::Notification(SeverityLevel level, NotificationCode code, std::string title, std::string description)
: level{level}, code{code}, title(std::move(title)), description(std::move(description)){};
Notification::Notification(SeverityLevel level, NotificationCode code, std::string title)
: level{level}, code{code}, title(std::move(title)){};
std::map<std::string, TypedValue> Notification::ConvertToMap() const {
return std::map<std::string, TypedValue>{{"severity", TypedValue(GetSeverityLevelString(level))},
{"code", TypedValue(GetCodeString(code))},
{"title", TypedValue(title)},
{"description", TypedValue(description)}};
}
std::string ExecutionStatsKeyToString(const ExecutionStats::Key key) {
switch (key) {
case ExecutionStats::Key::CREATED_NODES:
return std::string("nodes-created");
case ExecutionStats::Key::DELETED_NODES:
return std::string("nodes-deleted");
case ExecutionStats::Key::CREATED_EDGES:
return std::string("relationships-created");
case ExecutionStats::Key::DELETED_EDGES:
return std::string("relationships-deleted");
case ExecutionStats::Key::CREATED_LABELS:
return std::string("labels-added");
case ExecutionStats::Key::DELETED_LABELS:
return std::string("labels-removed");
case ExecutionStats::Key::UPDATED_PROPERTIES:
return std::string("properties-set");
}
}
} // namespace query

90
src/query/metadata.hpp Normal file
View File

@ -0,0 +1,90 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cstdint>
#include <map>
#include <string>
#include <string_view>
#include <type_traits>
#include "query/typed_value.hpp"
namespace query {
enum class SeverityLevel : uint8_t { INFO, WARNING };
enum class NotificationCode : uint8_t {
CREATE_CONSTRAINT,
CREATE_INDEX,
CHECK_STREAM,
CREATE_STREAM,
CREATE_TRIGGER,
DROP_CONSTRAINT,
DROP_INDEX,
DROP_REPLICA,
DROP_STREAM,
DROP_TRIGGER,
EXISTANT_INDEX,
EXISTANT_CONSTRAINT,
LOAD_CSV_TIP,
NONEXISTANT_INDEX,
NONEXISTANT_CONSTRAINT,
REPLICA_PORT_WARNING,
REGISTER_REPLICA,
SET_REPLICA,
START_STREAM,
START_ALL_STREAMS,
STOP_STREAM,
STOP_ALL_STREAMS,
};
struct Notification {
SeverityLevel level;
NotificationCode code;
std::string title;
std::string description;
explicit Notification(SeverityLevel level);
Notification(SeverityLevel level, NotificationCode code, std::string title, std::string description);
Notification(SeverityLevel level, NotificationCode code, std::string title);
std::map<std::string, TypedValue> ConvertToMap() const;
};
struct ExecutionStats {
public:
// All the stats have specific key to be compatible with neo4j
enum class Key : uint8_t {
CREATED_NODES,
DELETED_NODES,
CREATED_EDGES,
DELETED_EDGES,
CREATED_LABELS,
DELETED_LABELS,
UPDATED_PROPERTIES,
};
int64_t &operator[](Key key) { return counters[static_cast<size_t>(key)]; }
private:
static constexpr auto kExecutionStatsCountersSize = std::underlying_type_t<Key>(Key::UPDATED_PROPERTIES) + 1;
public:
std::array<int64_t, kExecutionStatsCountersSize> counters{0};
};
std::string ExecutionStatsKeyToString(ExecutionStats::Key key);
} // namespace query

View File

@ -12,6 +12,7 @@
#include "query/plan/operator.hpp"
#include <algorithm>
#include <cstdint>
#include <limits>
#include <queue>
#include <random>
@ -171,9 +172,10 @@ CreateNode::CreateNode(const std::shared_ptr<LogicalOperator> &input, const Node
// Creates a vertex on this GraphDb. Returns a reference to vertex placed on the
// frame.
VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info, Frame *frame, const ExecutionContext &context) {
VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info, Frame *frame, ExecutionContext &context) {
auto &dba = *context.db_accessor;
auto new_node = dba.InsertVertex();
context.execution_stats[ExecutionStats::Key::CREATED_NODES] += 1;
for (auto label : node_info.labels) {
auto maybe_error = new_node.AddLabel(label);
if (maybe_error.HasError()) {
@ -188,6 +190,7 @@ VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info, Frame *fram
throw QueryRuntimeException("Unexpected error when setting a label.");
}
}
context.execution_stats[ExecutionStats::Key::CREATED_LABELS] += 1;
}
// Evaluator should use the latest accessors, as modified in this query, when
// setting properties on new nodes.
@ -346,6 +349,7 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, ExecutionContext &cont
}
}();
context.execution_stats[ExecutionStats::Key::CREATED_EDGES] += 1;
if (context.trigger_context_collector) {
context.trigger_context_collector->RegisterCreatedObject(created_edge);
}
@ -1925,7 +1929,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
throw QueryRuntimeException("Unexpected error when deleting an edge.");
}
}
context.execution_stats[ExecutionStats::Key::DELETED_EDGES] += 1;
if (context.trigger_context_collector && maybe_value.GetValue()) {
context.trigger_context_collector->RegisterDeletedObject(*maybe_value.GetValue());
}
@ -1952,6 +1956,10 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
}
}
context.execution_stats[ExecutionStats::Key::DELETED_NODES] += 1;
if (*res) {
context.execution_stats[ExecutionStats::Key::DELETED_EDGES] += static_cast<int64_t>((*res)->second.size());
}
std::invoke([&] {
if (!context.trigger_context_collector || !*res) {
return;
@ -1979,7 +1987,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
throw QueryRuntimeException("Unexpected error when deleting a node.");
}
}
context.execution_stats[ExecutionStats::Key::DELETED_NODES] += 1;
if (context.trigger_context_collector && res.GetValue()) {
context.trigger_context_collector->RegisterDeletedObject(*res.GetValue());
}
@ -2038,7 +2046,7 @@ bool SetProperty::SetPropertyCursor::Pull(Frame &frame, ExecutionContext &contex
switch (lhs.type()) {
case TypedValue::Type::Vertex: {
auto old_value = PropsSetChecked(&lhs.ValueVertex(), self_.property_, rhs);
context.execution_stats[ExecutionStats::Key::UPDATED_PROPERTIES] += 1;
if (context.trigger_context_collector) {
// rhs cannot be moved because it was created with the allocator that is only valid during current pull
context.trigger_context_collector->RegisterSetObjectProperty(lhs.ValueVertex(), self_.property_,
@ -2048,7 +2056,7 @@ bool SetProperty::SetPropertyCursor::Pull(Frame &frame, ExecutionContext &contex
}
case TypedValue::Type::Edge: {
auto old_value = PropsSetChecked(&lhs.ValueEdge(), self_.property_, rhs);
context.execution_stats[ExecutionStats::Key::UPDATED_PROPERTIES] += 1;
if (context.trigger_context_collector) {
// rhs cannot be moved because it was created with the allocator that is only valid during current pull
context.trigger_context_collector->RegisterSetObjectProperty(lhs.ValueEdge(), self_.property_,
@ -2430,6 +2438,7 @@ bool RemoveLabels::RemoveLabelsCursor::Pull(Frame &frame, ExecutionContext &cont
}
}
context.execution_stats[ExecutionStats::Key::DELETED_LABELS] += 1;
if (context.trigger_context_collector && *maybe_value) {
context.trigger_context_collector->RegisterRemovedVertexLabel(vertex, label);
}

View File

@ -60,6 +60,7 @@ class LabelIndex {
/// @throw std::bad_alloc
bool CreateIndex(LabelId label, utils::SkipList<Vertex>::Accessor vertices);
/// Returns false if there was no index to drop
bool DropIndex(LabelId label) { return index_.erase(label) > 0; }
bool IndexExists(LabelId label) const { return index_.find(label) != index_.end(); }

View File

@ -20,6 +20,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <algorithm>
#include <cstdlib>
#include <filesystem>
@ -1106,3 +1107,364 @@ TEST_F(InterpreterTest, AllowLoadCsvConfig) {
check_load_csv_queries(true);
check_load_csv_queries(false);
}
void AssertAllValuesAreZero(const std::map<std::string, communication::bolt::Value> &map,
const std::vector<std::string> &exceptions) {
for (const auto &[key, value] : map) {
if (const auto it = std::find(exceptions.begin(), exceptions.end(), key); it != exceptions.end()) continue;
ASSERT_EQ(value.ValueInt(), 0);
}
}
TEST_F(InterpreterTest, ExecutionStatsIsValid) {
{
auto [stream, qid] = Prepare("MATCH (n) DELETE n;");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("stats"), 0);
}
{
std::array stats_keys{"nodes-created", "nodes-deleted", "relationships-created", "relationships-deleted",
"properties-set", "labels-added", "labels-removed"};
auto [stream, qid] = Prepare("CREATE ();");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("stats"), 1);
ASSERT_TRUE(stream.GetSummary().at("stats").IsMap());
auto stats = stream.GetSummary().at("stats").ValueMap();
ASSERT_TRUE(
std::all_of(stats_keys.begin(), stats_keys.end(), [&stats](const auto &key) { return stats.contains(key); }));
AssertAllValuesAreZero(stats, {"nodes-created"});
}
}
TEST_F(InterpreterTest, ExecutionStatsValues) {
{
auto [stream, qid] = Prepare("CREATE (),(),(),();");
Pull(&stream);
auto stats = stream.GetSummary().at("stats").ValueMap();
ASSERT_EQ(stats["nodes-created"].ValueInt(), 4);
AssertAllValuesAreZero(stats, {"nodes-created"});
}
{
auto [stream, qid] = Prepare("MATCH (n) DELETE n;");
Pull(&stream);
auto stats = stream.GetSummary().at("stats").ValueMap();
ASSERT_EQ(stats["nodes-deleted"].ValueInt(), 4);
AssertAllValuesAreZero(stats, {"nodes-deleted"});
}
{
auto [stream, qid] = Prepare("CREATE (n)-[:TO]->(m), (n)-[:TO]->(m), (n)-[:TO]->(m);");
Pull(&stream);
auto stats = stream.GetSummary().at("stats").ValueMap();
ASSERT_EQ(stats["nodes-created"].ValueInt(), 2);
ASSERT_EQ(stats["relationships-created"].ValueInt(), 3);
AssertAllValuesAreZero(stats, {"nodes-created", "relationships-created"});
}
{
auto [stream, qid] = Prepare("MATCH (n) DETACH DELETE n;");
Pull(&stream);
auto stats = stream.GetSummary().at("stats").ValueMap();
ASSERT_EQ(stats["nodes-deleted"].ValueInt(), 2);
ASSERT_EQ(stats["relationships-deleted"].ValueInt(), 3);
AssertAllValuesAreZero(stats, {"nodes-deleted", "relationships-deleted"});
}
{
auto [stream, qid] = Prepare("CREATE (:L1:L2:L3), (:L1), (:L1), (:L2);");
Pull(&stream);
auto stats = stream.GetSummary().at("stats").ValueMap();
ASSERT_EQ(stats["nodes-created"].ValueInt(), 4);
ASSERT_EQ(stats["labels-added"].ValueInt(), 6);
AssertAllValuesAreZero(stats, {"nodes-created", "labels-added"});
}
{
auto [stream, qid] = Prepare("MATCH (n:L1) SET n.name='test';");
Pull(&stream);
auto stats = stream.GetSummary().at("stats").ValueMap();
ASSERT_EQ(stats["properties-set"].ValueInt(), 3);
AssertAllValuesAreZero(stats, {"properties-set"});
}
}
TEST_F(InterpreterTest, NotificationsValidStructure) {
{
auto [stream, qid] = Prepare("MATCH (n) DELETE n;");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 0);
}
{
auto [stream, qid] = Prepare("CREATE INDEX ON :Person(id);");
Pull(&stream);
// Assert notifications list
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
ASSERT_TRUE(stream.GetSummary().at("notifications").IsList());
auto notifications = stream.GetSummary().at("notifications").ValueList();
// Assert one notification structure
ASSERT_EQ(notifications.size(), 1);
ASSERT_TRUE(notifications[0].IsMap());
auto notification = notifications[0].ValueMap();
ASSERT_TRUE(notification.contains("severity"));
ASSERT_TRUE(notification.contains("code"));
ASSERT_TRUE(notification.contains("title"));
ASSERT_TRUE(notification.contains("description"));
ASSERT_TRUE(notification["severity"].IsString());
ASSERT_TRUE(notification["code"].IsString());
ASSERT_TRUE(notification["title"].IsString());
ASSERT_TRUE(notification["description"].IsString());
}
}
TEST_F(InterpreterTest, IndexInfoNotifications) {
{
auto [stream, qid] = Prepare("CREATE INDEX ON :Person;");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "CreateIndex");
ASSERT_EQ(notification["title"].ValueString(), "Created index on label Person on properties .");
ASSERT_EQ(notification["description"].ValueString(), "");
}
{
auto [stream, qid] = Prepare("CREATE INDEX ON :Person(id);");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "CreateIndex");
ASSERT_EQ(notification["title"].ValueString(), "Created index on label Person on properties id.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
{
auto [stream, qid] = Prepare("CREATE INDEX ON :Person(id);");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "IndexAlreadyExists");
ASSERT_EQ(notification["title"].ValueString(), "Index on label Person on properties id already exists.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
{
auto [stream, qid] = Prepare("DROP INDEX ON :Person(id);");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "DropIndex");
ASSERT_EQ(notification["title"].ValueString(), "Dropped index on label Person on properties id.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
{
auto [stream, qid] = Prepare("DROP INDEX ON :Person(id);");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "IndexDoesNotExist");
ASSERT_EQ(notification["title"].ValueString(), "Index on label Person on properties id doesn't exist.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
}
TEST_F(InterpreterTest, ConstraintUniqueInfoNotifications) {
{
auto [stream, qid] = Prepare("CREATE CONSTRAINT ON (n:Person) ASSERT n.email, n.id IS UNIQUE;");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "CreateConstraint");
ASSERT_EQ(notification["title"].ValueString(),
"Created UNIQUE constraint on label Person on properties email, id.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
{
auto [stream, qid] = Prepare("CREATE CONSTRAINT ON (n:Person) ASSERT n.email, n.id IS UNIQUE;");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "ConstraintAlreadyExists");
ASSERT_EQ(notification["title"].ValueString(),
"Constraint UNIQUE on label Person on properties email, id already exists.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
{
auto [stream, qid] = Prepare("DROP CONSTRAINT ON (n:Person) ASSERT n.email, n.id IS UNIQUE;");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "DropConstraint");
ASSERT_EQ(notification["title"].ValueString(),
"Dropped UNIQUE constraint on label Person on properties email, id.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
{
auto [stream, qid] = Prepare("DROP CONSTRAINT ON (n:Person) ASSERT n.email, n.id IS UNIQUE;");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "ConstraintDoesNotExist");
ASSERT_EQ(notification["title"].ValueString(),
"Constraint UNIQUE on label Person on properties email, id doesn't exist.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
}
TEST_F(InterpreterTest, ConstraintExistsInfoNotifications) {
{
auto [stream, qid] = Prepare("CREATE CONSTRAINT ON (n:L1) ASSERT EXISTS (n.name);");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "CreateConstraint");
ASSERT_EQ(notification["title"].ValueString(), "Created EXISTS constraint on label L1 on properties name.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
{
auto [stream, qid] = Prepare("CREATE CONSTRAINT ON (n:L1) ASSERT EXISTS (n.name);");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "ConstraintAlreadyExists");
ASSERT_EQ(notification["title"].ValueString(), "Constraint EXISTS on label L1 on properties name already exists.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
{
auto [stream, qid] = Prepare("DROP CONSTRAINT ON (n:L1) ASSERT EXISTS (n.name);");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "DropConstraint");
ASSERT_EQ(notification["title"].ValueString(), "Dropped EXISTS constraint on label L1 on properties name.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
{
auto [stream, qid] = Prepare("DROP CONSTRAINT ON (n:L1) ASSERT EXISTS (n.name);");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "ConstraintDoesNotExist");
ASSERT_EQ(notification["title"].ValueString(), "Constraint EXISTS on label L1 on properties name doesn't exist.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
}
TEST_F(InterpreterTest, TriggerInfoNotifications) {
{
auto [stream, qid] = Prepare(
"CREATE TRIGGER bestTriggerEver ON CREATE AFTER COMMIT EXECUTE "
"CREATE ();");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "CreateTrigger");
ASSERT_EQ(notification["title"].ValueString(), "Created trigger bestTriggerEver.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
{
auto [stream, qid] = Prepare("DROP TRIGGER bestTriggerEver;");
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "DropTrigger");
ASSERT_EQ(notification["title"].ValueString(), "Dropped trigger bestTriggerEver.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
}
TEST_F(InterpreterTest, LoadCsvClauseNotification) {
auto dir_manager = TmpDirManager("csv_directory");
const auto csv_path = dir_manager.Path() / "file.csv";
auto writer = FileWriter(csv_path);
const std::string delimiter{"|"};
const std::vector<std::string> header{"A", "B", "C"};
writer.WriteLine(CreateRow(header, delimiter));
const std::vector<std::string> good_columns_1{"a", "b", "c"};
writer.WriteLine(CreateRow(good_columns_1, delimiter));
writer.Close();
const std::string query = fmt::format(R"(LOAD CSV FROM "{}" WITH HEADER IGNORE BAD DELIMITER "{}" AS x RETURN x;)",
csv_path.string(), delimiter);
auto [stream, qid] = Prepare(query);
Pull(&stream);
ASSERT_EQ(stream.GetSummary().count("notifications"), 1);
auto notifications = stream.GetSummary().at("notifications").ValueList();
auto notification = notifications[0].ValueMap();
ASSERT_EQ(notification["severity"].ValueString(), "INFO");
ASSERT_EQ(notification["code"].ValueString(), "LoadCSVTip");
ASSERT_EQ(notification["title"].ValueString(),
"It's important to note that the parser parses the values as strings. It's up to the user to "
"convert the parsed row values to the appropriate type. This can be done using the built-in "
"conversion functions such as ToInteger, ToFloat, ToBoolean etc.");
ASSERT_EQ(notification["description"].ValueString(), "");
}