Clean up commented out code
This commit is contained in:
parent
632f0398b5
commit
708e745410
src/query/v2
@ -388,40 +388,7 @@ TypedValue Last(const TypedValue *args, int64_t nargs, const FunctionContext &ct
|
||||
return TypedValue(list.back(), ctx.memory);
|
||||
}
|
||||
|
||||
TypedValue Properties(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
|
||||
FType<Or<Null, Vertex, Edge>>("properties", args, nargs);
|
||||
auto *dba = ctx.db_accessor;
|
||||
// auto get_properties = [&](const auto &record_accessor) {
|
||||
// TypedValue::TMap properties(ctx.memory);
|
||||
// auto props = record_accessor.Properties();
|
||||
// // add error handling
|
||||
// // if (maybe_props.HasError()) {
|
||||
// // switch (maybe_props.GetError()) {
|
||||
// // case storage::v3::Error::DELETED_OBJECT:
|
||||
// // throw QueryRuntimeException("Trying to get properties from a deleted object.");
|
||||
// // case storage::v3::Error::NONEXISTENT_OBJECT:
|
||||
// // throw query::v2::QueryRuntimeException("Trying to get properties from an object that doesn't
|
||||
// exist.");
|
||||
// // case storage::v3::Error::SERIALIZATION_ERROR:
|
||||
// // case storage::v3::Error::VERTEX_HAS_EDGES:
|
||||
// // case storage::v3::Error::PROPERTIES_DISABLED:
|
||||
// // throw QueryRuntimeException("Unexpected error when getting properties.");
|
||||
// // }
|
||||
// for (const auto &property : props) {
|
||||
// properties.emplace(property.first, ValueToTypedValue(property.second));
|
||||
// }
|
||||
// return TypedValue(std::move(properties));
|
||||
// };
|
||||
// const auto &value = args[0];
|
||||
// if (value.IsNull()) {
|
||||
// return TypedValue(ctx.memory);
|
||||
// } else if (value.IsVertex()) {
|
||||
// return get_properties(value.ValueVertex());
|
||||
// } else {
|
||||
// return get_properties(value.ValueEdge());
|
||||
// }
|
||||
return {};
|
||||
}
|
||||
TypedValue Properties(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) { return {}; }
|
||||
|
||||
TypedValue Size(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
|
||||
FType<Or<Null, List, String, Map, Path>>("size", args, nargs);
|
||||
@ -472,8 +439,6 @@ TypedValue Degree(const TypedValue *args, int64_t nargs, const FunctionContext &
|
||||
FType<Or<Null, Vertex>>("degree", args, nargs);
|
||||
if (args[0].IsNull()) return TypedValue(ctx.memory);
|
||||
const auto &vertex = args[0].ValueVertex();
|
||||
// size_t out_degree = UnwrapDegreeResult(vertex.OutDegree(ctx.view));
|
||||
// size_t in_degree = UnwrapDegreeResult(vertex.InDegree(ctx.view));
|
||||
// TODO(kostasrim) Fix dummy values
|
||||
return TypedValue(int64_t(0), ctx.memory);
|
||||
}
|
||||
@ -482,7 +447,6 @@ TypedValue InDegree(const TypedValue *args, int64_t nargs, const FunctionContext
|
||||
FType<Or<Null, Vertex>>("inDegree", args, nargs);
|
||||
if (args[0].IsNull()) return TypedValue(ctx.memory);
|
||||
const auto &vertex = args[0].ValueVertex();
|
||||
// size_t in_degree = UnwrapDegreeResult(vertex.InDegree(ctx.view));
|
||||
return TypedValue(int64_t(0), ctx.memory);
|
||||
}
|
||||
|
||||
@ -490,7 +454,6 @@ TypedValue OutDegree(const TypedValue *args, int64_t nargs, const FunctionContex
|
||||
FType<Or<Null, Vertex>>("outDegree", args, nargs);
|
||||
if (args[0].IsNull()) return TypedValue(ctx.memory);
|
||||
const auto &vertex = args[0].ValueVertex();
|
||||
// size_t out_degree = UnwrapDegreeResult(vertex.OutDegree(ctx.view));
|
||||
return TypedValue(int64_t(0), ctx.memory);
|
||||
}
|
||||
|
||||
@ -599,83 +562,25 @@ TypedValue ValueType(const TypedValue *args, int64_t nargs, const FunctionContex
|
||||
// TODO: How is Keys different from Properties function?
|
||||
TypedValue Keys(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
|
||||
FType<Or<Null, Vertex, Edge>>("keys", args, nargs);
|
||||
auto *dba = ctx.db_accessor;
|
||||
// auto get_keys = [&](const auto &record_accessor) {
|
||||
// TypedValue::TVector keys(ctx.memory);
|
||||
// auto maybe_props = record_accessor.Properties();
|
||||
// // if (maybe_props.HasError()) {
|
||||
// // switch (maybe_props.GetError()) {
|
||||
// // case storage::v3::Error::DELETED_OBJECT:
|
||||
// // throw QueryRuntimeException("Trying to get keys from a deleted object.");
|
||||
// // case storage::v3::Error::NONEXISTENT_OBJECT:
|
||||
// // throw query::v2::QueryRuntimeException("Trying to get keys from an object that doesn't exist.");
|
||||
// // case storage::v3::Error::SERIALIZATION_ERROR:
|
||||
// // case storage::v3::Error::VERTEX_HAS_EDGES:
|
||||
// // case storage::v3::Error::PROPERTIES_DISABLED:
|
||||
// // throw QueryRuntimeException("Unexpected error when getting keys.");
|
||||
// // }
|
||||
// // }
|
||||
// for (const auto &property : maybe_props) {
|
||||
// keys.emplace_back(property.first);
|
||||
// }
|
||||
// return TypedValue(std::move(keys));
|
||||
// };
|
||||
// const auto &value = args[0];
|
||||
// if (value.IsNull()) {
|
||||
// return TypedValue(ctx.memory);
|
||||
// } else if (value.IsVertex()) {
|
||||
// return get_keys(value.ValueVertex());
|
||||
// } else {
|
||||
// return get_keys(value.ValueEdge());
|
||||
// }
|
||||
return TypedValue{};
|
||||
}
|
||||
|
||||
TypedValue Labels(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
|
||||
FType<Or<Null, Vertex>>("labels", args, nargs);
|
||||
auto *dba = ctx.db_accessor;
|
||||
if (args[0].IsNull()) return TypedValue(ctx.memory);
|
||||
// TypedValue::TVector labels(ctx.memory);
|
||||
return TypedValue();
|
||||
// auto maybe_labels = args[0].ValueVertex().Labels();
|
||||
// if (maybe_labels.HasError()) {
|
||||
// switch (maybe_labels.GetError()) {
|
||||
// case storage::v3::Error::DELETED_OBJECT:
|
||||
// throw QueryRuntimeException("Trying to get labels from a deleted node.");
|
||||
// case storage::v3::Error::NONEXISTENT_OBJECT:
|
||||
// throw query::v2::QueryRuntimeException("Trying to get labels from a node that doesn't exist.");
|
||||
// case storage::v3::Error::SERIALIZATION_ERROR:
|
||||
// case storage::v3::Error::VERTEX_HAS_EDGES:
|
||||
// case storage::v3::Error::PROPERTIES_DISABLED:
|
||||
// throw QueryRuntimeException("Unexpected error when getting labels.");
|
||||
// }
|
||||
// }
|
||||
// for (const auto &label : maybe_labels) {
|
||||
// labels.emplace_back(label);
|
||||
// }
|
||||
// return TypedValue(std::move(labels));
|
||||
}
|
||||
|
||||
TypedValue Nodes(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
|
||||
FType<Or<Null, Path>>("nodes", args, nargs);
|
||||
if (args[0].IsNull()) return TypedValue(ctx.memory);
|
||||
// const auto &vertices = args[0].ValuePath().vertices();
|
||||
return TypedValue();
|
||||
// TypedValue::TVector values(ctx.memory);
|
||||
// values.reserve(vertices.size());
|
||||
// for (const auto &v : vertices) values.emplace_back(v);
|
||||
// return TypedValue(std::move(values));
|
||||
}
|
||||
|
||||
TypedValue Relationships(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
|
||||
FType<Or<Null, Path>>("relationships", args, nargs);
|
||||
if (args[0].IsNull()) return TypedValue(ctx.memory);
|
||||
// const auto &edges = args[0].ValuePath().edges();
|
||||
return TypedValue();
|
||||
// TypedValue::TVector values(ctx.memory);
|
||||
// values.reserve(edges.size());
|
||||
// for (const auto &e : edges) values.emplace_back(e);
|
||||
// return TypedValue(std::move(values));
|
||||
}
|
||||
|
||||
TypedValue Range(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
|
||||
@ -1188,50 +1093,6 @@ TypedValue Duration(const TypedValue *args, int64_t nargs, const FunctionContext
|
||||
return TypedValue(utils::Duration(duration_parameters), ctx.memory);
|
||||
}
|
||||
|
||||
// std::function<TypedValue(const TypedValue *, const int64_t, const FunctionContext &)> UserFunction(
|
||||
// const mgp_func &func, const std::string &fully_qualified_name) {
|
||||
// return [func, fully_qualified_name](const TypedValue *args, int64_t nargs, const FunctionContext &ctx) ->
|
||||
// TypedValue {
|
||||
// /// Find function is called to aquire the lock on Module pointer while user-defined function is executed
|
||||
// const auto &maybe_found =
|
||||
// procedure::FindFunction(procedure::gModuleRegistry, fully_qualified_name, utils::NewDeleteResource());
|
||||
// if (!maybe_found) {
|
||||
// throw QueryRuntimeException(
|
||||
// "Function '{}' has been unloaded. Please check query modules to confirm that function is loaded in
|
||||
// Memgraph.", fully_qualified_name);
|
||||
// }
|
||||
//
|
||||
// const auto &func_cb = func.cb;
|
||||
// mgp_memory memory{ctx.memory};
|
||||
// mgp_func_context functx{ctx.db_accessor, ctx.view};
|
||||
// auto graph = mgp_graph::NonWritableGraph(*ctx.db_accessor, ctx.view);
|
||||
//
|
||||
// std::vector<TypedValue> args_list;
|
||||
// args_list.reserve(nargs);
|
||||
// for (std::size_t i = 0; i < nargs; ++i) {
|
||||
// args_list.emplace_back(args[i]);
|
||||
// }
|
||||
//
|
||||
// auto function_argument_list = mgp_list(ctx.memory);
|
||||
// procedure::ConstructArguments(args_list, func, fully_qualified_name, function_argument_list, graph);
|
||||
//
|
||||
// mgp_func_result maybe_res;
|
||||
// func_cb(&function_argument_list, &functx, &maybe_res, &memory);
|
||||
// if (maybe_res.error_msg) {
|
||||
// throw QueryRuntimeException(*maybe_res.error_msg);
|
||||
// }
|
||||
//
|
||||
// if (!maybe_res.value) {
|
||||
// throw QueryRuntimeException(
|
||||
// "Function '{}' didn't set the result nor the error message. Please either set the result by using "
|
||||
// "mgp_func_result_set_value or the error by using mgp_func_result_set_error_msg.",
|
||||
// fully_qualified_name);
|
||||
// }
|
||||
//
|
||||
// return {*(maybe_res.value), ctx.memory};
|
||||
// };
|
||||
// }
|
||||
|
||||
} // namespace
|
||||
|
||||
std::function<TypedValue(const TypedValue *, int64_t, const FunctionContext &ctx)> NameToFunction(
|
||||
@ -1318,14 +1179,6 @@ std::function<TypedValue(const TypedValue *, int64_t, const FunctionContext &ctx
|
||||
if (function_name == "LOCALDATETIME") return LocalDateTime;
|
||||
if (function_name == "DURATION") return Duration;
|
||||
|
||||
// const auto &maybe_found =
|
||||
// procedure::FindFunction(procedure::gModuleRegistry, function_name, utils::NewDeleteResource());
|
||||
//
|
||||
// if (maybe_found) {
|
||||
// const auto *func = (*maybe_found).second;
|
||||
// return UserFunction(*func, function_name);
|
||||
// }
|
||||
//
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
@ -41,8 +41,6 @@
|
||||
#include "query/v2/plan/planner.hpp"
|
||||
#include "query/v2/plan/profile.hpp"
|
||||
#include "query/v2/plan/vertex_count_cache.hpp"
|
||||
//#include "query/v2/stream/common.hpp"
|
||||
//#include "query/v2/trigger.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "storage/v3/storage.hpp"
|
||||
#include "utils/algorithm.hpp"
|
||||
@ -570,14 +568,6 @@ std::optional<std::string> StringPointerToOptional(const std::string *str) {
|
||||
return str == nullptr ? std::nullopt : std::make_optional(*str);
|
||||
}
|
||||
|
||||
// stream::CommonStreamInfo GetCommonStreamInfo(StreamQuery *stream_query, ExpressionEvaluator &evaluator) {
|
||||
// return {
|
||||
// .batch_interval = GetOptionalValue<std::chrono::milliseconds>(stream_query->batch_interval_, evaluator)
|
||||
// .value_or(stream::kDefaultBatchInterval),
|
||||
// .batch_size = GetOptionalValue<int64_t>(stream_query->batch_size_,
|
||||
// evaluator).value_or(stream::kDefaultBatchSize), .transformation_name = stream_query->transform_name_};
|
||||
// }
|
||||
//
|
||||
std::vector<std::string> EvaluateTopicNames(ExpressionEvaluator &evaluator,
|
||||
std::variant<Expression *, std::vector<std::string>> topic_variant) {
|
||||
return std::visit(utils::Overloaded{[&](Expression *expression) {
|
||||
@ -589,216 +579,6 @@ std::vector<std::string> EvaluateTopicNames(ExpressionEvaluator &evaluator,
|
||||
std::move(topic_variant));
|
||||
}
|
||||
|
||||
// Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, ExpressionEvaluator &evaluator,
|
||||
// InterpreterContext *interpreter_context,
|
||||
// const std::string *username) {
|
||||
// static constexpr std::string_view kDefaultConsumerGroup = "mg_consumer";
|
||||
// std::string consumer_group{stream_query->consumer_group_.empty() ? kDefaultConsumerGroup
|
||||
// : stream_query->consumer_group_};
|
||||
//
|
||||
// auto bootstrap = GetOptionalStringValue(stream_query->bootstrap_servers_, evaluator);
|
||||
// if (bootstrap && bootstrap->empty()) {
|
||||
// throw SemanticException("Bootstrap servers must not be an empty string!");
|
||||
// }
|
||||
// auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator);
|
||||
//
|
||||
// const auto get_config_map = [&evaluator](std::unordered_map<Expression *, Expression *> map,
|
||||
// std::string_view map_name) -> std::unordered_map<std::string, std::string>
|
||||
// {
|
||||
// std::unordered_map<std::string, std::string> config_map;
|
||||
// for (const auto [key_expr, value_expr] : map) {
|
||||
// const auto key = key_expr->Accept(evaluator);
|
||||
// const auto value = value_expr->Accept(evaluator);
|
||||
// if (!key.IsString() || !value.IsString()) {
|
||||
// throw SemanticException("{} must contain only string keys and values!", map_name);
|
||||
// }
|
||||
// config_map.emplace(key.ValueString(), value.ValueString());
|
||||
// }
|
||||
// return config_map;
|
||||
// };
|
||||
//
|
||||
// return [interpreter_context, stream_name = stream_query->stream_name_,
|
||||
// topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_),
|
||||
// consumer_group = std::move(consumer_group), common_stream_info = std::move(common_stream_info),
|
||||
// bootstrap_servers = std::move(bootstrap), owner = StringPointerToOptional(username),
|
||||
// configs = get_config_map(stream_query->configs_, "Configs"),
|
||||
// credentials = get_config_map(stream_query->credentials_, "Credentials")]() mutable {
|
||||
// std::string bootstrap = bootstrap_servers
|
||||
// ? std::move(*bootstrap_servers)
|
||||
// : std::string{interpreter_context->config.default_kafka_bootstrap_servers};
|
||||
// interpreter_context->streams.Create<query::v2::stream::KafkaStream>(stream_name,
|
||||
// {.common_info =
|
||||
// std::move(common_stream_info),
|
||||
// .topics = std::move(topic_names),
|
||||
// .consumer_group = std::move(consumer_group),
|
||||
// .bootstrap_servers = std::move(bootstrap),
|
||||
// .configs = std::move(configs),
|
||||
// .credentials = std::move(credentials)},
|
||||
// std::move(owner));
|
||||
//
|
||||
// return std::vector<std::vector<TypedValue>>{};
|
||||
// };
|
||||
// }
|
||||
//
|
||||
// Callback::CallbackFunction GetPulsarCreateCallback(StreamQuery *stream_query, ExpressionEvaluator &evaluator,
|
||||
// InterpreterContext *interpreter_context,
|
||||
// const std::string *username) {
|
||||
// auto service_url = GetOptionalStringValue(stream_query->service_url_, evaluator);
|
||||
// if (service_url && service_url->empty()) {
|
||||
// throw SemanticException("Service URL must not be an empty string!");
|
||||
// }
|
||||
// auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator);
|
||||
// return [interpreter_context, stream_name = stream_query->stream_name_,
|
||||
// topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_),
|
||||
// common_stream_info = std::move(common_stream_info), service_url = std::move(service_url),
|
||||
// owner = StringPointerToOptional(username)]() mutable {
|
||||
// std::string url =
|
||||
// service_url ? std::move(*service_url) : std::string{interpreter_context->config.default_pulsar_service_url};
|
||||
// interpreter_context->streams.Create<query::v2::stream::PulsarStream>(
|
||||
// stream_name,
|
||||
// {.common_info = std::move(common_stream_info), .topics = std::move(topic_names), .service_url =
|
||||
// std::move(url)}, std::move(owner));
|
||||
//
|
||||
// return std::vector<std::vector<TypedValue>>{};
|
||||
// };
|
||||
// }
|
||||
//
|
||||
// Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶meters,
|
||||
// InterpreterContext *interpreter_context, DbAccessor *db_accessor,
|
||||
// const std::string *username, std::vector<Notification> *notifications) {
|
||||
// expr::Frame<TypedValue> frame(0);
|
||||
// SymbolTable symbol_table;
|
||||
// EvaluationContext evaluation_context;
|
||||
// // TODO: MemoryResource for EvaluationContext, it should probably be passed as
|
||||
// // the argument to Callback.
|
||||
// evaluation_context.timestamp = QueryTimestamp();
|
||||
// evaluation_context.parameters = parameters;
|
||||
// ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
|
||||
//
|
||||
// Callback callback;
|
||||
// switch (stream_query->action_) {
|
||||
// case StreamQuery::Action::CREATE_STREAM: {
|
||||
// EventCounter::IncrementCounter(EventCounter::StreamsCreated);
|
||||
// switch (stream_query->type_) {
|
||||
// case StreamQuery::Type::KAFKA:
|
||||
// callback.fn = GetKafkaCreateCallback(stream_query, evaluator, interpreter_context, username);
|
||||
// break;
|
||||
// case StreamQuery::Type::PULSAR:
|
||||
// callback.fn = GetPulsarCreateCallback(stream_query, evaluator, interpreter_context, username);
|
||||
// break;
|
||||
// }
|
||||
// notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CREATE_STREAM,
|
||||
// fmt::format("Created stream {}.", stream_query->stream_name_));
|
||||
// return callback;
|
||||
// }
|
||||
// case StreamQuery::Action::START_STREAM: {
|
||||
// const auto batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator);
|
||||
// const auto timeout = GetOptionalValue<std::chrono::milliseconds>(stream_query->timeout_, evaluator);
|
||||
//
|
||||
// if (batch_limit.has_value()) {
|
||||
// if (batch_limit.value() < 0) {
|
||||
// throw utils::BasicException("Parameter BATCH_LIMIT cannot hold negative value");
|
||||
// }
|
||||
//
|
||||
// callback.fn = [interpreter_context, stream_name = stream_query->stream_name_, batch_limit, timeout]() {
|
||||
// interpreter_context->streams.StartWithLimit(stream_name, static_cast<uint64_t>(batch_limit.value()),
|
||||
// timeout); return std::vector<std::vector<TypedValue>>{};
|
||||
// };
|
||||
// } else {
|
||||
// callback.fn = [interpreter_context, stream_name = stream_query->stream_name_]() {
|
||||
// interpreter_context->streams.Start(stream_name);
|
||||
// return std::vector<std::vector<TypedValue>>{};
|
||||
// };
|
||||
// notifications->emplace_back(SeverityLevel::INFO, NotificationCode::START_STREAM,
|
||||
// fmt::format("Started stream {}.", stream_query->stream_name_));
|
||||
// }
|
||||
// return callback;
|
||||
// }
|
||||
// case StreamQuery::Action::START_ALL_STREAMS: {
|
||||
// callback.fn = [interpreter_context]() {
|
||||
// interpreter_context->streams.StartAll();
|
||||
// return std::vector<std::vector<TypedValue>>{};
|
||||
// };
|
||||
// notifications->emplace_back(SeverityLevel::INFO, NotificationCode::START_ALL_STREAMS, "Started all streams.");
|
||||
// return callback;
|
||||
// }
|
||||
// case StreamQuery::Action::STOP_STREAM: {
|
||||
// callback.fn = [interpreter_context, stream_name = stream_query->stream_name_]() {
|
||||
// interpreter_context->streams.Stop(stream_name);
|
||||
// return std::vector<std::vector<TypedValue>>{};
|
||||
// };
|
||||
// notifications->emplace_back(SeverityLevel::INFO, NotificationCode::STOP_STREAM,
|
||||
// fmt::format("Stopped stream {}.", stream_query->stream_name_));
|
||||
// return callback;
|
||||
// }
|
||||
// case StreamQuery::Action::STOP_ALL_STREAMS: {
|
||||
// callback.fn = [interpreter_context]() {
|
||||
// interpreter_context->streams.StopAll();
|
||||
// return std::vector<std::vector<TypedValue>>{};
|
||||
// };
|
||||
// notifications->emplace_back(SeverityLevel::INFO, NotificationCode::STOP_ALL_STREAMS, "Stopped all streams.");
|
||||
// return callback;
|
||||
// }
|
||||
// case StreamQuery::Action::DROP_STREAM: {
|
||||
// callback.fn = [interpreter_context, stream_name = stream_query->stream_name_]() {
|
||||
// interpreter_context->streams.Drop(stream_name);
|
||||
// return std::vector<std::vector<TypedValue>>{};
|
||||
// };
|
||||
// notifications->emplace_back(SeverityLevel::INFO, NotificationCode::DROP_STREAM,
|
||||
// fmt::format("Dropped stream {}.", stream_query->stream_name_));
|
||||
// return callback;
|
||||
// }
|
||||
// case StreamQuery::Action::SHOW_STREAMS: {
|
||||
// callback.header = {"name", "type", "batch_interval", "batch_size", "transformation_name", "owner", "is
|
||||
// running"}; callback.fn = [interpreter_context]() {
|
||||
// auto streams_status = interpreter_context->streams.GetStreamInfo();
|
||||
// std::vector<std::vector<TypedValue>> results;
|
||||
// results.reserve(streams_status.size());
|
||||
// auto stream_info_as_typed_stream_info_emplace_in = [](auto &typed_status, const auto &stream_info) {
|
||||
// typed_status.emplace_back(stream_info.batch_interval.count());
|
||||
// typed_status.emplace_back(stream_info.batch_size);
|
||||
// typed_status.emplace_back(stream_info.transformation_name);
|
||||
// };
|
||||
//
|
||||
// for (const auto &status : streams_status) {
|
||||
// std::vector<TypedValue> typed_status;
|
||||
// typed_status.reserve(7);
|
||||
// typed_status.emplace_back(status.name);
|
||||
// typed_status.emplace_back(StreamSourceTypeToString(status.type));
|
||||
// stream_info_as_typed_stream_info_emplace_in(typed_status, status.info);
|
||||
// if (status.owner.has_value()) {
|
||||
// typed_status.emplace_back(*status.owner);
|
||||
// } else {
|
||||
// typed_status.emplace_back();
|
||||
// }
|
||||
// typed_status.emplace_back(status.is_running);
|
||||
// results.push_back(std::move(typed_status));
|
||||
// }
|
||||
//
|
||||
// return results;
|
||||
// };
|
||||
// return callback;
|
||||
// }
|
||||
// case StreamQuery::Action::CHECK_STREAM: {
|
||||
// callback.header = {"queries", "raw messages"};
|
||||
//
|
||||
// const auto batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator);
|
||||
// if (batch_limit.has_value() && batch_limit.value() < 0) {
|
||||
// throw utils::BasicException("Parameter BATCH_LIMIT cannot hold negative value");
|
||||
// }
|
||||
//
|
||||
// callback.fn = [interpreter_context, stream_name = stream_query->stream_name_,
|
||||
// timeout = GetOptionalValue<std::chrono::milliseconds>(stream_query->timeout_, evaluator),
|
||||
// batch_limit]() mutable {
|
||||
// return interpreter_context->streams.Check(stream_name, timeout, batch_limit);
|
||||
// };
|
||||
// notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CHECK_STREAM,
|
||||
// fmt::format("Checked stream {}.", stream_query->stream_name_));
|
||||
// return callback;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶meters, DbAccessor *db_accessor) {
|
||||
expr::Frame<TypedValue> frame(0);
|
||||
SymbolTable symbol_table;
|
||||
@ -1631,172 +1411,6 @@ PreparedQuery PrepareFreeMemoryQuery(ParsedQuery parsed_query, const bool in_exp
|
||||
RWType::NONE};
|
||||
}
|
||||
|
||||
// TriggerEventType ToTriggerEventType(const TriggerQuery::EventType event_type) {
|
||||
// switch (event_type) {
|
||||
// case TriggerQuery::EventType::ANY:
|
||||
// return TriggerEventType::ANY;
|
||||
//
|
||||
// case TriggerQuery::EventType::CREATE:
|
||||
// return TriggerEventType::CREATE;
|
||||
//
|
||||
// case TriggerQuery::EventType::VERTEX_CREATE:
|
||||
// return TriggerEventType::VERTEX_CREATE;
|
||||
//
|
||||
// case TriggerQuery::EventType::EDGE_CREATE:
|
||||
// return TriggerEventType::EDGE_CREATE;
|
||||
//
|
||||
// case TriggerQuery::EventType::DELETE:
|
||||
// return TriggerEventType::DELETE;
|
||||
//
|
||||
// case TriggerQuery::EventType::VERTEX_DELETE:
|
||||
// return TriggerEventType::VERTEX_DELETE;
|
||||
//
|
||||
// case TriggerQuery::EventType::EDGE_DELETE:
|
||||
// return TriggerEventType::EDGE_DELETE;
|
||||
//
|
||||
// case TriggerQuery::EventType::UPDATE:
|
||||
// return TriggerEventType::UPDATE;
|
||||
//
|
||||
// case TriggerQuery::EventType::VERTEX_UPDATE:
|
||||
// return TriggerEventType::VERTEX_UPDATE;
|
||||
//
|
||||
// case TriggerQuery::EventType::EDGE_UPDATE:
|
||||
// return TriggerEventType::EDGE_UPDATE;
|
||||
// }
|
||||
// }
|
||||
|
||||
// Callback CreateTrigger(TriggerQuery *trigger_query,
|
||||
// const std::map<std::string, storage::v3::PropertyValue> &user_parameters,
|
||||
// InterpreterContext *interpreter_context, DbAccessor *dba, std::optional<std::string> owner) {
|
||||
// return {
|
||||
// {},
|
||||
// [trigger_name = std::move(trigger_query->trigger_name_), trigger_statement =
|
||||
// std::move(trigger_query->statement_),
|
||||
// event_type = trigger_query->event_type_, before_commit = trigger_query->before_commit_, interpreter_context,
|
||||
// dba, user_parameters, owner = std::move(owner)]() mutable -> std::vector<std::vector<TypedValue>> {
|
||||
// interpreter_context->trigger_store.AddTrigger(
|
||||
// std::move(trigger_name), trigger_statement, user_parameters, ToTriggerEventType(event_type),
|
||||
// before_commit ? TriggerPhase::BEFORE_COMMIT : TriggerPhase::AFTER_COMMIT,
|
||||
// &interpreter_context->ast_cache, dba, interpreter_context->config.query, std::move(owner),
|
||||
// interpreter_context->auth_checker);
|
||||
// return {};
|
||||
// }};
|
||||
// }
|
||||
//
|
||||
// Callback DropTrigger(TriggerQuery *trigger_query, InterpreterContext *interpreter_context) {
|
||||
// return {{},
|
||||
// [trigger_name = std::move(trigger_query->trigger_name_),
|
||||
// interpreter_context]() -> std::vector<std::vector<TypedValue>> {
|
||||
// interpreter_context->trigger_store.DropTrigger(trigger_name);
|
||||
// return {};
|
||||
// }};
|
||||
// }
|
||||
//
|
||||
// Callback ShowTriggers(InterpreterContext *interpreter_context) {
|
||||
// return {{"trigger name", "statement", "event type", "phase", "owner"}, [interpreter_context] {
|
||||
// std::vector<std::vector<TypedValue>> results;
|
||||
// auto trigger_infos = interpreter_context->trigger_store.GetTriggerInfo();
|
||||
// results.reserve(trigger_infos.size());
|
||||
// for (auto &trigger_info : trigger_infos) {
|
||||
// std::vector<TypedValue> typed_trigger_info;
|
||||
// typed_trigger_info.reserve(4);
|
||||
// typed_trigger_info.emplace_back(std::move(trigger_info.name));
|
||||
// typed_trigger_info.emplace_back(std::move(trigger_info.statement));
|
||||
// typed_trigger_info.emplace_back(TriggerEventTypeToString(trigger_info.event_type));
|
||||
// typed_trigger_info.emplace_back(trigger_info.phase == TriggerPhase::BEFORE_COMMIT ? "BEFORE COMMIT"
|
||||
// : "AFTER COMMIT");
|
||||
// typed_trigger_info.emplace_back(trigger_info.owner.has_value() ? TypedValue{*trigger_info.owner}
|
||||
// : TypedValue{});
|
||||
//
|
||||
// results.push_back(std::move(typed_trigger_info));
|
||||
// }
|
||||
//
|
||||
// return results;
|
||||
// }};
|
||||
// }
|
||||
//
|
||||
// PreparedQuery PrepareTriggerQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
|
||||
// std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
|
||||
// DbAccessor *dba,
|
||||
// const std::map<std::string, storage::v3::PropertyValue> &user_parameters,
|
||||
// const std::string *username) {
|
||||
// if (in_explicit_transaction) {
|
||||
// throw TriggerModificationInMulticommandTxException();
|
||||
// }
|
||||
//
|
||||
// auto *trigger_query = utils::Downcast<TriggerQuery>(parsed_query.query);
|
||||
// MG_ASSERT(trigger_query);
|
||||
//
|
||||
// std::optional<Notification> trigger_notification;
|
||||
// auto callback = std::invoke([trigger_query, interpreter_context, dba, &user_parameters,
|
||||
// owner = StringPointerToOptional(username), &trigger_notification]() mutable {
|
||||
// switch (trigger_query->action_) {
|
||||
// case TriggerQuery::Action::CREATE_TRIGGER:
|
||||
// trigger_notification.emplace(SeverityLevel::INFO, NotificationCode::CREATE_TRIGGER,
|
||||
// fmt::format("Created trigger {}.", trigger_query->trigger_name_));
|
||||
// EventCounter::IncrementCounter(EventCounter::TriggersCreated);
|
||||
// return CreateTrigger(trigger_query, user_parameters, interpreter_context, dba, std::move(owner));
|
||||
// case TriggerQuery::Action::DROP_TRIGGER:
|
||||
// trigger_notification.emplace(SeverityLevel::INFO, NotificationCode::DROP_TRIGGER,
|
||||
// fmt::format("Dropped trigger {}.", trigger_query->trigger_name_));
|
||||
// return DropTrigger(trigger_query, interpreter_context);
|
||||
// case TriggerQuery::Action::SHOW_TRIGGERS:
|
||||
// return ShowTriggers(interpreter_context);
|
||||
// }
|
||||
// });
|
||||
//
|
||||
// return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges),
|
||||
// [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr},
|
||||
// trigger_notification = std::move(trigger_notification), notifications](
|
||||
// AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
|
||||
// if (UNLIKELY(!pull_plan)) {
|
||||
// pull_plan = std::make_shared<PullPlanVector>(callback_fn());
|
||||
// }
|
||||
//
|
||||
// if (pull_plan->Pull(stream, n)) {
|
||||
// if (trigger_notification) {
|
||||
// notifications->push_back(std::move(*trigger_notification));
|
||||
// }
|
||||
// return QueryHandlerResult::COMMIT;
|
||||
// }
|
||||
// return std::nullopt;
|
||||
// },
|
||||
// RWType::NONE};
|
||||
// // False positive report for the std::make_shared above
|
||||
// // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks)
|
||||
// }
|
||||
|
||||
// PreparedQuery PrepareStreamQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
|
||||
// std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
|
||||
// DbAccessor *dba,
|
||||
// const std::map<std::string, storage::v3::PropertyValue> & /*user_parameters*/,
|
||||
// const std::string *username) {
|
||||
// if (in_explicit_transaction) {
|
||||
// throw StreamQueryInMulticommandTxException();
|
||||
// }
|
||||
//
|
||||
// auto *stream_query = utils::Downcast<StreamQuery>(parsed_query.query);
|
||||
// MG_ASSERT(stream_query);
|
||||
// auto callback =
|
||||
// HandleStreamQuery(stream_query, parsed_query.parameters, interpreter_context, dba, username, notifications);
|
||||
//
|
||||
// return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges),
|
||||
// [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
|
||||
// AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
|
||||
// if (UNLIKELY(!pull_plan)) {
|
||||
// pull_plan = std::make_shared<PullPlanVector>(callback_fn());
|
||||
// }
|
||||
//
|
||||
// if (pull_plan->Pull(stream, n)) {
|
||||
// return QueryHandlerResult::COMMIT;
|
||||
// }
|
||||
// return std::nullopt;
|
||||
// },
|
||||
// RWType::NONE};
|
||||
// // False positive report for the std::make_shared above
|
||||
// // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks)
|
||||
// }
|
||||
|
||||
constexpr auto ToStorageIsolationLevel(const IsolationLevelQuery::IsolationLevel isolation_level) noexcept {
|
||||
switch (isolation_level) {
|
||||
case IsolationLevelQuery::IsolationLevel::SNAPSHOT_ISOLATION:
|
||||
@ -2273,11 +1887,6 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
db_accessor_ = std::make_unique<storage::v3::Storage::Accessor>(
|
||||
interpreter_context_->db->Access(GetIsolationLevelOverride()));
|
||||
execution_db_accessor_.emplace(db_accessor_.get());
|
||||
|
||||
// if (utils::Downcast<CypherQuery>(parsed_query.query) && interpreter_context_->trigger_store.HasTriggers())
|
||||
// {
|
||||
// trigger_context_collector_.emplace(interpreter_context_->trigger_store.GetEventTypes());
|
||||
// }
|
||||
}
|
||||
|
||||
utils::Timer planning_timer;
|
||||
@ -2287,7 +1896,6 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
prepared_query = PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
|
||||
&*execution_db_accessor_, &query_execution->execution_memory,
|
||||
&query_execution->notifications);
|
||||
// trigger_context_collector_ ? &*trigger_context_collector_ : nullptr);
|
||||
} else if (utils::Downcast<ExplainQuery>(parsed_query.query)) {
|
||||
prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
|
||||
&*execution_db_accessor_, &query_execution->execution_memory_with_exception);
|
||||
@ -2323,15 +1931,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
prepared_query = PrepareFreeMemoryQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_);
|
||||
} else if (utils::Downcast<TriggerQuery>(parsed_query.query)) {
|
||||
throw std::runtime_error("Unimplemented");
|
||||
// prepared_query =
|
||||
// PrepareTriggerQuery(std::move(parsed_query), in_explicit_transaction_,
|
||||
// &query_execution->notifications,
|
||||
// interpreter_context_, &*execution_db_accessor_, params, username);
|
||||
} else if (utils::Downcast<StreamQuery>(parsed_query.query)) {
|
||||
throw std::runtime_error("unimplemented");
|
||||
// prepared_query =
|
||||
// PrepareStreamQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications,
|
||||
// interpreter_context_, &*execution_db_accessor_, params, username);
|
||||
} else if (utils::Downcast<IsolationLevelQuery>(parsed_query.query)) {
|
||||
prepared_query =
|
||||
PrepareIsolationLevelQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_, this);
|
||||
@ -2379,59 +1980,8 @@ void Interpreter::Abort() {
|
||||
db_accessor_->Abort();
|
||||
execution_db_accessor_.reset();
|
||||
db_accessor_.reset();
|
||||
// trigger_context_collector_.reset();
|
||||
}
|
||||
|
||||
namespace {
|
||||
// void RunTriggersIndividually(const utils::SkipList<Trigger> &triggers, InterpreterContext *interpreter_context,
|
||||
// TriggerContext trigger_context) {
|
||||
// // Run the triggers
|
||||
// for (const auto &trigger : triggers.access()) {
|
||||
// utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
|
||||
//
|
||||
// // create a new transaction for each trigger
|
||||
// auto storage_acc = interpreter_context->db->Access();
|
||||
// DbAccessor db_accessor{&storage_acc};
|
||||
//
|
||||
// trigger_context.AdaptForAccessor(&db_accessor);
|
||||
// try {
|
||||
// trigger.Execute(&db_accessor, &execution_memory, interpreter_context->config.execution_timeout_sec,
|
||||
// &interpreter_context->is_shutting_down, trigger_context, interpreter_context->auth_checker);
|
||||
// } catch (const utils::BasicException &exception) {
|
||||
// spdlog::warn("Trigger '{}' failed with exception:\n{}", trigger.Name(), exception.what());
|
||||
// db_accessor.Abort();
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// auto maybe_constraint_violation = db_accessor.Commit();
|
||||
// if (maybe_constraint_violation.HasError()) {
|
||||
// const auto &constraint_violation = maybe_constraint_violation.GetError();
|
||||
// switch (constraint_violation.type) {
|
||||
// case storage::v3::ConstraintViolation::Type::EXISTENCE: {
|
||||
// const auto &label_name = db_accessor.LabelToName(constraint_violation.label);
|
||||
// MG_ASSERT(constraint_violation.properties.size() == 1U);
|
||||
// const auto &property_name = db_accessor.PropertyToName(*constraint_violation.properties.begin());
|
||||
// spdlog::warn("Trigger '{}' failed to commit due to existence constraint violation on :{}({})",
|
||||
// trigger.Name(), label_name, property_name);
|
||||
// break;
|
||||
// }
|
||||
// case storage::v3::ConstraintViolation::Type::UNIQUE: {
|
||||
// const auto &label_name = db_accessor.LabelToName(constraint_violation.label);
|
||||
// std::stringstream property_names_stream;
|
||||
// utils::PrintIterable(property_names_stream, constraint_violation.properties, ", ",
|
||||
// [&](auto &stream, const auto &prop) { stream << db_accessor.PropertyToName(prop);
|
||||
// });
|
||||
// spdlog::warn("Trigger '{}' failed to commit due to unique constraint violation on :{}({})",
|
||||
// trigger.Name(),
|
||||
// label_name, property_names_stream.str());
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
} // namespace
|
||||
|
||||
void Interpreter::Commit() {
|
||||
// It's possible that some queries did not finish because the user did
|
||||
// not pull all of the results from the query.
|
||||
@ -2440,36 +1990,6 @@ void Interpreter::Commit() {
|
||||
// a query.
|
||||
if (!db_accessor_) return;
|
||||
|
||||
// std::optional<TriggerContext> trigger_context = std::nullopt;
|
||||
// if (trigger_context_collector_) {
|
||||
// trigger_context.emplace(std::move(*trigger_context_collector_).TransformToTriggerContext());
|
||||
// trigger_context_collector_.reset();
|
||||
// }
|
||||
//
|
||||
// if (trigger_context) {
|
||||
// // Run the triggers
|
||||
// for (const auto &trigger : interpreter_context_->trigger_store.BeforeCommitTriggers().access()) {
|
||||
// utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
|
||||
// AdvanceCommand();
|
||||
// try {
|
||||
// trigger.Execute(&*execution_db_accessor_, &execution_memory,
|
||||
// interpreter_context_->config.execution_timeout_sec,
|
||||
// &interpreter_context_->is_shutting_down, *trigger_context,
|
||||
// interpreter_context_->auth_checker);
|
||||
// } catch (const utils::BasicException &e) {
|
||||
// throw utils::BasicException(
|
||||
// fmt::format("Trigger '{}' caused the transaction to fail.\nException: {}", trigger.Name(), e.what()));
|
||||
// }
|
||||
// }
|
||||
// SPDLOG_DEBUG("Finished executing before commit triggers");
|
||||
// }
|
||||
//
|
||||
// const auto reset_necessary_members = [this]() {
|
||||
// execution_db_accessor_.reset();
|
||||
// db_accessor_.reset();
|
||||
// trigger_context_collector_.reset();
|
||||
// };
|
||||
|
||||
auto maybe_constraint_violation = db_accessor_->Commit();
|
||||
if (maybe_constraint_violation.HasError()) {
|
||||
const auto &constraint_violation = maybe_constraint_violation.GetError();
|
||||
@ -2478,7 +1998,6 @@ void Interpreter::Commit() {
|
||||
auto label_name = execution_db_accessor_->LabelToName(constraint_violation.label);
|
||||
MG_ASSERT(constraint_violation.properties.size() == 1U);
|
||||
auto property_name = execution_db_accessor_->PropertyToName(*constraint_violation.properties.begin());
|
||||
// reset_necessary_members();
|
||||
throw QueryException("Unable to commit due to existence constraint violation on :{}({})", label_name,
|
||||
property_name);
|
||||
break;
|
||||
@ -2489,7 +2008,6 @@ void Interpreter::Commit() {
|
||||
utils::PrintIterable(
|
||||
property_names_stream, constraint_violation.properties, ", ",
|
||||
[this](auto &stream, const auto &prop) { stream << execution_db_accessor_->PropertyToName(prop); });
|
||||
// reset_necessary_members();
|
||||
throw QueryException("Unable to commit due to unique constraint violation on :{}({})", label_name,
|
||||
property_names_stream.str());
|
||||
break;
|
||||
@ -2497,24 +2015,6 @@ void Interpreter::Commit() {
|
||||
}
|
||||
}
|
||||
|
||||
// The ordered execution of after commit triggers is heavily depending on the exclusiveness of db_accessor_->Commit():
|
||||
// only one of the transactions can be commiting at the same time, so when the commit is finished, that transaction
|
||||
// probably will schedule its after commit triggers, because the other transactions that want to commit are still
|
||||
// waiting for commiting or one of them just started commiting its changes.
|
||||
// This means the ordered execution of after commit triggers are not guaranteed.
|
||||
// if (trigger_context && interpreter_context_->trigger_store.AfterCommitTriggers().size() > 0) {
|
||||
// interpreter_context_->after_commit_trigger_pool.AddTask(
|
||||
// [trigger_context = std::move(*trigger_context), interpreter_context = this->interpreter_context_,
|
||||
// user_transaction = std::shared_ptr(std::move(db_accessor_))]() mutable {
|
||||
// RunTriggersIndividually(interpreter_context->trigger_store.AfterCommitTriggers(), interpreter_context,
|
||||
// std::move(trigger_context));
|
||||
// user_transaction->FinalizeTransaction();
|
||||
// SPDLOG_DEBUG("Finished executing after commit triggers"); // NOLINT(bugprone-lambda-function-name)
|
||||
// });
|
||||
// }
|
||||
|
||||
// reset_necessary_members();
|
||||
|
||||
SPDLOG_DEBUG("Finished committing the transaction");
|
||||
}
|
||||
|
||||
|
@ -179,12 +179,7 @@ struct InterpreterContext {
|
||||
utils::SkipList<QueryCacheEntry> ast_cache;
|
||||
utils::SkipList<PlanCacheEntry> plan_cache;
|
||||
|
||||
// TriggerStore trigger_store;
|
||||
// utils::ThreadPool after_commit_trigger_pool{1};
|
||||
|
||||
const InterpreterConfig config;
|
||||
|
||||
// query::v2::stream::Streams streams;
|
||||
};
|
||||
|
||||
/// Function that is used to tell all active interpreters that they should stop
|
||||
@ -317,7 +312,6 @@ class Interpreter final {
|
||||
// move this unique_ptr into a shrared_ptr.
|
||||
std::unique_ptr<storage::v3::Storage::Accessor> db_accessor_;
|
||||
std::optional<DbAccessor> execution_db_accessor_;
|
||||
// std::optional<TriggerContextCollector> trigger_context_collector_;
|
||||
bool in_explicit_transaction_{false};
|
||||
bool expect_rollback_{false};
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user