Implement query engine client (#531)

- Add shard request manager
This commit is contained in:
Kostas Kyrimis 2022-09-22 15:05:43 +02:00 committed by GitHub
parent ce788f5f65
commit 925835b080
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 2009 additions and 2403 deletions

View File

@ -54,6 +54,11 @@ struct GetShardMapResponse {
ShardMap shard_map;
};
struct AllocateHlcBatchRequest {
Hlc low;
Hlc high;
};
struct AllocateHlcBatchResponse {
bool success;
Hlc low;

View File

@ -21,5 +21,4 @@ using memgraph::io::rsm::RsmClient;
template <typename IoImpl>
using CoordinatorClient = RsmClient<IoImpl, CoordinatorWriteRequests, CoordinatorWriteResponses,
CoordinatorReadRequests, CoordinatorReadResponses>;
} // namespace memgraph::coordinator

View File

@ -48,6 +48,7 @@ enum class Status : uint8_t {
struct AddressAndStatus {
memgraph::io::Address address;
Status status;
friend bool operator<(const AddressAndStatus &lhs, const AddressAndStatus &rhs) { return lhs.address < rhs.address; }
};
using PrimaryKey = std::vector<PropertyValue>;
@ -82,6 +83,12 @@ struct ShardMap {
std::map<LabelId, LabelSpace> label_spaces;
std::map<LabelId, std::vector<SchemaProperty>> schemas;
Shards GetShards(const LabelName &label) {
const auto id = labels.at(label);
auto &shards = label_spaces.at(id).shards;
return shards;
}
// TODO(gabor) later we will want to update the wallclock time with
// the given Io<impl>'s time as well
Hlc IncrementShardMapVersion() noexcept {
@ -183,6 +190,8 @@ struct ShardMap {
// Find a random place for the server to plug in
}
LabelId GetLabelId(const std::string &label) const { return labels.at(label); }
Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const {
MG_ASSERT(start_key <= end_key);
MG_ASSERT(labels.contains(label_name));
@ -219,6 +228,17 @@ struct ShardMap {
return std::prev(label_space.shards.upper_bound(key))->second;
}
Shard GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const {
MG_ASSERT(label_spaces.contains(label_id));
const auto &label_space = label_spaces.at(label_id);
MG_ASSERT(label_space.shards.begin()->first <= key,
"the ShardMap must always contain a minimal key that is less than or equal to any requested key");
return std::prev(label_space.shards.upper_bound(key))->second;
}
PropertyMap AllocatePropertyIds(const std::vector<PropertyName> &new_properties) {
PropertyMap ret{};

View File

@ -17,6 +17,7 @@
#include <map>
#include <optional>
#include <regex>
#include <type_traits>
#include <vector>
#include "expr/ast.hpp"
@ -27,8 +28,11 @@
namespace memgraph::expr {
struct StorageTag {};
struct QueryEngineTag {};
template <typename TypedValue, typename EvaluationContext, typename DbAccessor, typename StorageView, typename LabelId,
typename PropertyValue, typename ConvFunctor, typename Error>
typename PropertyValue, typename ConvFunctor, typename Error, typename Tag = StorageTag>
class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
public:
ExpressionEvaluator(Frame<TypedValue> *frame, const SymbolTable &symbol_table, const EvaluationContext &ctx,
@ -377,6 +381,43 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
}
}
template <typename VertexAccessor, typename TTag = Tag,
typename TReturnType = std::enable_if_t<std::is_same_v<TTag, StorageTag>, bool>>
TReturnType HasLabelImpl(const VertexAccessor &vertex, const LabelIx &label, StorageTag /*tag*/) {
auto has_label = vertex.HasLabel(view_, GetLabel(label));
if (has_label.HasError() && has_label.GetError() == Error::NONEXISTENT_OBJECT) {
// This is a very nasty and temporary hack in order to make MERGE
// work. The old storage had the following logic when returning an
// `OLD` view: `return old ? old : new`. That means that if the
// `OLD` view didn't exist, it returned the NEW view. With this hack
// we simulate that behavior.
// TODO (mferencevic, teon.banek): Remove once MERGE is
// reimplemented.
has_label = vertex.HasLabel(StorageView::NEW, GetLabel(label));
}
if (has_label.HasError()) {
switch (has_label.GetError()) {
case Error::DELETED_OBJECT:
throw ExpressionRuntimeException("Trying to access labels on a deleted node.");
case Error::NONEXISTENT_OBJECT:
throw ExpressionRuntimeException("Trying to access labels from a node that doesn't exist.");
case Error::SERIALIZATION_ERROR:
case Error::VERTEX_HAS_EDGES:
case Error::PROPERTIES_DISABLED:
throw ExpressionRuntimeException("Unexpected error when accessing labels.");
}
}
return *has_label;
}
template <typename VertexAccessor, typename TTag = Tag,
typename TReturnType = std::enable_if_t<std::is_same_v<TTag, QueryEngineTag>, bool>>
TReturnType HasLabelImpl(const VertexAccessor &vertex, const LabelIx &label_ix, QueryEngineTag /*tag*/) {
auto label = typename VertexAccessor::Label{LabelId::FromUint(label_ix.ix)};
auto has_label = vertex.HasLabel(label);
return !has_label;
}
TypedValue Visit(LabelsTest &labels_test) override {
auto expression_result = labels_test.expression_->Accept(*this);
switch (expression_result.type()) {
@ -384,35 +425,12 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
return TypedValue(ctx_->memory);
case TypedValue::Type::Vertex: {
const auto &vertex = expression_result.ValueVertex();
for (const auto &label : labels_test.labels_) {
auto has_label = vertex.HasLabel(view_, GetLabel(label));
if (has_label.HasError() && has_label.GetError() == Error::NONEXISTENT_OBJECT) {
// This is a very nasty and temporary hack in order to make MERGE
// work. The old storage had the following logic when returning an
// `OLD` view: `return old ? old : new`. That means that if the
// `OLD` view didn't exist, it returned the NEW view. With this hack
// we simulate that behavior.
// TODO (mferencevic, teon.banek): Remove once MERGE is
// reimplemented.
has_label = vertex.HasLabel(StorageView::NEW, GetLabel(label));
}
if (has_label.HasError()) {
switch (has_label.GetError()) {
case Error::DELETED_OBJECT:
throw ExpressionRuntimeException("Trying to access labels on a deleted node.");
case Error::NONEXISTENT_OBJECT:
throw ExpressionRuntimeException("Trying to access labels from a node that doesn't exist.");
case Error::SERIALIZATION_ERROR:
case Error::VERTEX_HAS_EDGES:
case Error::PROPERTIES_DISABLED:
throw ExpressionRuntimeException("Unexpected error when accessing labels.");
}
}
if (!*has_label) {
return TypedValue(false, ctx_->memory);
}
if (std::ranges::all_of(labels_test.labels_, [&vertex, this](const auto label_test) {
return this->HasLabelImpl(vertex, label_test, Tag{});
})) {
return TypedValue(true, ctx_->memory);
}
return TypedValue(true, ctx_->memory);
return TypedValue(false, ctx_->memory);
}
default:
throw ExpressionRuntimeException("Only nodes have labels.");
@ -695,7 +713,24 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
}
private:
template <class TRecordAccessor>
template <class TRecordAccessor, class TTag = Tag,
class TReturnType = std::enable_if_t<std::is_same_v<TTag, QueryEngineTag>, TypedValue>>
TReturnType GetProperty(const TRecordAccessor &record_accessor, PropertyIx prop) {
auto maybe_prop = record_accessor.GetProperty(prop.name);
// Handler non existent property
return conv_(maybe_prop);
}
template <class TRecordAccessor, class TTag = Tag,
class TReturnType = std::enable_if_t<std::is_same_v<TTag, QueryEngineTag>, TypedValue>>
TReturnType GetProperty(const TRecordAccessor &record_accessor, const std::string_view name) {
auto maybe_prop = record_accessor.GetProperty(std::string(name));
// Handler non existent property
return conv_(maybe_prop);
}
template <class TRecordAccessor, class TTag = Tag,
class TReturnType = std::enable_if_t<std::is_same_v<TTag, StorageTag>, TypedValue>>
TypedValue GetProperty(const TRecordAccessor &record_accessor, PropertyIx prop) {
auto maybe_prop = record_accessor.GetProperty(view_, ctx_->properties[prop.ix]);
if (maybe_prop.HasError() && maybe_prop.GetError() == Error::NONEXISTENT_OBJECT) {
@ -722,7 +757,8 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
return conv_(*maybe_prop, ctx_->memory);
}
template <class TRecordAccessor>
template <class TRecordAccessor, class TTag = Tag,
class TReturnType = std::enable_if_t<std::is_same_v<TTag, StorageTag>, TypedValue>>
TypedValue GetProperty(const TRecordAccessor &record_accessor, const std::string_view name) {
auto maybe_prop = record_accessor.GetProperty(view_, dba_->NameToProperty(name));
if (maybe_prop.HasError() && maybe_prop.GetError() == Error::NONEXISTENT_OBJECT) {

View File

@ -116,13 +116,15 @@ class TypedValueT {
return hash;
}
case TypedValueT::Type::Vertex:
return 34;
case TypedValueT::Type::Edge:
return 0;
return 35;
case TypedValueT::Type::Path: {
const auto &vertices = value.ValuePath().vertices();
const auto &edges = value.ValuePath().edges();
return utils::FnvCollection<decltype(vertices), TVertexAccessor>{}(vertices) ^
utils::FnvCollection<decltype(edges), TEdgeAccessor>{}(edges);
// const auto &vertices = value.ValuePath().vertices();
// const auto &edges = value.ValuePath().edges();
// return utils::FnvCollection<decltype(vertices), TVertexAccessor>{}(vertices) ^
// utils::FnvCollection<decltype(edges), TEdgeAccessor>{}(edges);
return 36;
}
case TypedValueT::Type::Date:
return utils::DateHash{}(value.ValueDate());

View File

@ -67,7 +67,7 @@ class Io {
I implementation_;
Address address_;
RequestId request_id_counter_ = 0;
Duration default_timeout_ = std::chrono::microseconds{50000};
Duration default_timeout_ = std::chrono::microseconds{100000};
public:
Io(I io, Address address) : implementation_(io), address_(address) {}

View File

@ -23,17 +23,18 @@ set(mg_query_v2_sources
plan/rewrite/index_lookup.cpp
plan/rule_based_planner.cpp
plan/variable_start_planner.cpp
procedure/mg_procedure_impl.cpp
procedure/mg_procedure_helpers.cpp
procedure/module.cpp
procedure/py_module.cpp
# procedure/mg_procedure_impl.cpp
# procedure/mg_procedure_helpers.cpp
# procedure/module.cpp
# procedure/py_module.cpp
serialization/property_value.cpp
stream/streams.cpp
stream/sources.cpp
stream/common.cpp
trigger.cpp
trigger_context.cpp
bindings/typed_value.cpp)
# stream/streams.cpp
# stream/sources.cpp
# stream/common.cpp
# trigger.cpp
# trigger_context.cpp
bindings/typed_value.cpp
accessors.cpp)
find_package(Boost REQUIRED)

View File

@ -0,0 +1,75 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "query/v2/accessors.hpp"
#include "query/v2/requests.hpp"
namespace memgraph::query::v2::accessors {
EdgeAccessor::EdgeAccessor(Edge edge, std::vector<std::pair<PropertyId, Value>> props)
: edge(std::move(edge)), properties(std::move(props)) {}
uint64_t EdgeAccessor::EdgeType() const { return edge.type.id; }
std::vector<std::pair<PropertyId, Value>> EdgeAccessor::Properties() const {
return properties;
// std::map<std::string, TypedValue> res;
// for (const auto &[name, value] : *properties) {
// res[name] = ValueToTypedValue(value);
// }
// return res;
}
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
Value EdgeAccessor::GetProperty(const std::string & /*prop_name*/) const {
// TODO(kostasrim) fix this
return {};
}
Edge EdgeAccessor::GetEdge() const { return edge; }
VertexAccessor EdgeAccessor::To() const { return VertexAccessor(Vertex{edge.dst}, {}); }
VertexAccessor EdgeAccessor::From() const { return VertexAccessor(Vertex{edge.src}, {}); }
VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props)
: vertex(std::move(v)), properties(std::move(props)) {}
std::vector<Label> VertexAccessor::Labels() const { return vertex.labels; }
bool VertexAccessor::HasLabel(Label &label) const {
return std::find_if(vertex.labels.begin(), vertex.labels.end(),
[label](const auto &l) { return l.id == label.id; }) != vertex.labels.end();
}
std::vector<std::pair<PropertyId, Value>> VertexAccessor::Properties() const {
// std::map<std::string, TypedValue> res;
// for (const auto &[name, value] : *properties) {
// res[name] = ValueToTypedValue(value);
// }
// return res;
return properties;
}
Value VertexAccessor::GetProperty(PropertyId prop_id) const {
return std::find_if(properties.begin(), properties.end(), [&](auto &pr) { return prop_id == pr.first; })->second;
// return ValueToTypedValue(properties[prop_name]);
}
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
Value VertexAccessor::GetProperty(const std::string & /*prop_name*/) const {
// TODO(kostasrim) Add string mapping
return {};
// return ValueToTypedValue(properties[prop_name]);
}
msgs::Vertex VertexAccessor::GetVertex() const { return vertex; }
} // namespace memgraph::query::v2::accessors

187
src/query/v2/accessors.hpp Normal file
View File

@ -0,0 +1,187 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <optional>
#include <utility>
#include <vector>
#include "query/exceptions.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/view.hpp"
#include "utils/bound.hpp"
#include "utils/exceptions.hpp"
#include "utils/memory.hpp"
#include "utils/memory_tracker.hpp"
namespace memgraph::query::v2::accessors {
using Value = memgraph::msgs::Value;
using Edge = memgraph::msgs::Edge;
using Vertex = memgraph::msgs::Vertex;
using Label = memgraph::msgs::Label;
using PropertyId = memgraph::msgs::PropertyId;
class VertexAccessor;
class EdgeAccessor final {
public:
EdgeAccessor(Edge edge, std::vector<std::pair<PropertyId, Value>> props);
uint64_t EdgeType() const;
std::vector<std::pair<PropertyId, Value>> Properties() const;
Value GetProperty(const std::string &prop_name) const;
Edge GetEdge() const;
// Dummy function
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
inline size_t CypherId() const { return 10; }
// bool HasSrcAccessor const { return src == nullptr; }
// bool HasDstAccessor const { return dst == nullptr; }
VertexAccessor To() const;
VertexAccessor From() const;
friend bool operator==(const EdgeAccessor &lhs, const EdgeAccessor &rhs) {
return lhs.edge == rhs.edge && lhs.properties == rhs.properties;
}
friend bool operator!=(const EdgeAccessor &lhs, const EdgeAccessor &rhs) { return !(lhs == rhs); }
private:
Edge edge;
std::vector<std::pair<PropertyId, Value>> properties;
};
class VertexAccessor final {
public:
using PropertyId = msgs::PropertyId;
using Label = msgs::Label;
VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props);
std::vector<Label> Labels() const;
bool HasLabel(Label &label) const;
std::vector<std::pair<PropertyId, Value>> Properties() const;
Value GetProperty(PropertyId prop_id) const;
Value GetProperty(const std::string &prop_name) const;
msgs::Vertex GetVertex() const;
// Dummy function
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
inline size_t CypherId() const { return 10; }
// auto InEdges(storage::View view, const std::vector<storage::EdgeTypeId> &edge_types) const
// -> storage::Result<decltype(iter::imap(MakeEdgeAccessor, *impl_.InEdges(view)))> {
// auto maybe_edges = impl_.InEdges(view, edge_types);
// if (maybe_edges.HasError()) return maybe_edges.GetError();
// return iter::imap(MakeEdgeAccessor, std::move(*maybe_edges));
// }
//
// auto InEdges(storage::View view) const { return InEdges(view, {}); }
//
// auto InEdges(storage::View view, const std::vector<storage::EdgeTypeId> &edge_types, const VertexAccessor &dest)
// const
// -> storage::Result<decltype(iter::imap(MakeEdgeAccessor, *impl_.InEdges(view)))> {
// auto maybe_edges = impl_.InEdges(view, edge_types, &dest.impl_);
// if (maybe_edges.HasError()) return maybe_edges.GetError();
// return iter::imap(MakeEdgeAccessor, std::move(*maybe_edges));
// }
//
// auto OutEdges(storage::View view, const std::vector<storage::EdgeTypeId> &edge_types) const
// -> storage::Result<decltype(iter::imap(MakeEdgeAccessor, *impl_.OutEdges(view)))> {
// auto maybe_edges = impl_.OutEdges(view, edge_types);
// if (maybe_edges.HasError()) return maybe_edges.GetError();
// return iter::imap(MakeEdgeAccessor, std::move(*maybe_edges));
// }
//
// auto OutEdges(storage::View view) const { return OutEdges(view, {}); }
//
// auto OutEdges(storage::View view, const std::vector<storage::EdgeTypeId> &edge_types,
// const VertexAccessor &dest) const
// -> storage::Result<decltype(iter::imap(MakeEdgeAccessor, *impl_.OutEdges(view)))> {
// auto maybe_edges = impl_.OutEdges(view, edge_types, &dest.impl_);
// if (maybe_edges.HasError()) return maybe_edges.GetError();
// return iter::imap(MakeEdgeAccessor, std::move(*maybe_edges));
// }
// storage::Result<size_t> InDegree(storage::View view) const { return impl_.InDegree(view); }
//
// storage::Result<size_t> OutDegree(storage::View view) const { return impl_.OutDegree(view); }
//
friend bool operator==(const VertexAccessor &lhs, const VertexAccessor &rhs) {
return lhs.vertex == rhs.vertex && lhs.properties == rhs.properties;
}
friend bool operator!=(const VertexAccessor &lhs, const VertexAccessor &rhs) { return !(lhs == rhs); }
private:
Vertex vertex;
std::vector<std::pair<PropertyId, Value>> properties;
};
// inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); }
// inline VertexAccessor EdgeAccessor::From() const { return VertexAccessor(impl_.FromVertex()); }
// Highly mocked interface. Won't work if used.
class Path {
public:
// Empty for now
explicit Path(const VertexAccessor & /*vertex*/, utils::MemoryResource *memory = utils::NewDeleteResource())
: mem(memory) {}
template <typename... TOthers>
explicit Path(const VertexAccessor &vertex, const TOthers &...others) {}
template <typename... TOthers>
Path(std::allocator_arg_t /*unused*/, utils::MemoryResource *memory, const VertexAccessor &vertex,
const TOthers &...others) {}
Path(const Path & /*other*/) {}
Path(const Path & /*other*/, utils::MemoryResource *memory) : mem(memory) {}
Path(Path && /*other*/) noexcept {}
Path(Path && /*other*/, utils::MemoryResource *memory) : mem(memory) {}
Path &operator=(const Path &path) {
if (this == &path) {
return *this;
}
return *this;
}
Path &operator=(Path &&path) noexcept {
if (this == &path) {
return *this;
}
return *this;
}
~Path() {}
friend bool operator==(const Path & /*lhs*/, const Path & /*rhs*/) { return true; };
utils::MemoryResource *GetMemoryResource() { return mem; }
private:
utils::MemoryResource *mem = utils::NewDeleteResource();
};
} // namespace memgraph::query::v2::accessors

View File

@ -16,25 +16,29 @@
#include "expr/interpret/eval.hpp"
#include "query/v2/bindings/typed_value.hpp"
#include "query/v2/context.hpp"
#include "query/v2/conversions.hpp"
#include "query/v2/db_accessor.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/conversions.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_store.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/view.hpp"
namespace memgraph::query::v2 {
struct PropertyToTypedValueConverter {
TypedValue operator()(const auto &val) { return memgraph::storage::v3::PropertyToTypedValue<TypedValue>(val); }
TypedValue operator()(const auto &val, utils::MemoryResource *mem) {
return memgraph::storage::v3::PropertyToTypedValue<TypedValue>(val, mem);
}
inline const auto lam = [](const auto &val) { return ValueToTypedValue(val); };
namespace detail {
class Callable {
public:
auto operator()(const memgraph::storage::v3::PropertyValue &val) const {
return memgraph::storage::v3::PropertyToTypedValue<TypedValue>(val);
};
auto operator()(const msgs::Value &val) const { return ValueToTypedValue(val); };
};
} // namespace detail
using ExpressionEvaluator =
memgraph::expr::ExpressionEvaluator<TypedValue, EvaluationContext, DbAccessor, storage::v3::View,
storage::v3::LabelId, storage::v3::PropertyStore, PropertyToTypedValueConverter,
memgraph::storage::v3::Error>;
memgraph::expr::ExpressionEvaluator<TypedValue, memgraph::query::v2::EvaluationContext, DbAccessor,
storage::v3::View, storage::v3::LabelId, msgs::Value, detail::Callable,
memgraph::storage::v3::Error, memgraph::expr::QueryEngineTag>;
} // namespace memgraph::query::v2

View File

@ -10,10 +10,10 @@
// licenses/APL.txt.
#include "expr/typed_value.hpp"
#include "query/v2/db_accessor.hpp"
#include "query/v2/accessors.hpp"
#include "query/v2/path.hpp"
namespace memgraph::expr {
namespace v2 = memgraph::query::v2;
template class TypedValueT<v2::VertexAccessor, v2::EdgeAccessor, v2::Path>;
template class TypedValueT<v2::accessors::VertexAccessor, v2::accessors::EdgeAccessor, v2::accessors::Path>;
} // namespace memgraph::expr

View File

@ -14,13 +14,14 @@
#include "query/v2/bindings/bindings.hpp"
#include "expr/typed_value.hpp"
#include "query/v2/db_accessor.hpp"
#include "query/v2/path.hpp"
#include "query/v2/accessors.hpp"
namespace memgraph::expr {
namespace v2 = memgraph::query::v2;
extern template class memgraph::expr::TypedValueT<v2::VertexAccessor, v2::EdgeAccessor, v2::Path>;
extern template class memgraph::expr::TypedValueT<v2::accessors::VertexAccessor, v2::accessors::EdgeAccessor,
v2::accessors::Path>;
} // namespace memgraph::expr
namespace memgraph::query::v2 {
using TypedValue = memgraph::expr::TypedValueT<VertexAccessor, EdgeAccessor, Path>;
using TypedValue =
memgraph::expr::TypedValueT<v2::accessors::VertexAccessor, v2::accessors::EdgeAccessor, v2::accessors::Path>;
} // namespace memgraph::query::v2

View File

@ -18,7 +18,8 @@
#include "query/v2/metadata.hpp"
#include "query/v2/parameters.hpp"
#include "query/v2/plan/profile.hpp"
#include "query/v2/trigger.hpp"
//#include "query/v2/trigger.hpp"
#include "query/v2/shard_request_manager.hpp"
#include "utils/async_timer.hpp"
namespace memgraph::query::v2 {
@ -70,8 +71,9 @@ struct ExecutionContext {
plan::ProfilingStats stats;
plan::ProfilingStats *stats_root{nullptr};
ExecutionStats execution_stats;
TriggerContextCollector *trigger_context_collector{nullptr};
// TriggerContextCollector *trigger_context_collector{nullptr};
utils::AsyncTimer timer;
std::unique_ptr<msgs::ShardRequestManagerInterface> shard_request_manager{nullptr};
};
static_assert(std::is_move_assignable_v<ExecutionContext>, "ExecutionContext must be move assignable!");

View File

@ -0,0 +1,100 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "bindings/typed_value.hpp"
#include "query/v2/accessors.hpp"
#include "query/v2/requests.hpp"
namespace memgraph::query::v2 {
inline TypedValue ValueToTypedValue(const msgs::Value &value) {
using Value = msgs::Value;
switch (value.type) {
case Value::Type::Null:
return {};
case Value::Type::Bool:
return TypedValue(value.bool_v);
case Value::Type::Int64:
return TypedValue(value.int_v);
case Value::Type::Double:
return TypedValue(value.double_v);
case Value::Type::String:
return TypedValue(value.string_v);
case Value::Type::List: {
const auto &lst = value.list_v;
std::vector<TypedValue> dst;
dst.reserve(lst.size());
for (const auto &elem : lst) {
dst.push_back(ValueToTypedValue(elem));
}
return TypedValue(std::move(dst));
}
case Value::Type::Map: {
const auto &value_map = value.map_v;
std::map<std::string, TypedValue> dst;
for (const auto &[key, val] : value_map) {
dst[key] = ValueToTypedValue(val);
}
return TypedValue(std::move(dst));
}
case Value::Type::Vertex:
return TypedValue(accessors::VertexAccessor(value.vertex_v, {}));
case Value::Type::Edge:
return TypedValue(accessors::EdgeAccessor(value.edge_v, {}));
case Value::Type::Path:
break;
}
throw std::runtime_error("Incorrect type in conversion");
}
inline msgs::Value TypedValueToValue(const TypedValue &value) {
using Value = msgs::Value;
switch (value.type()) {
case TypedValue::Type::Null:
return {};
case TypedValue::Type::Bool:
return Value(value.ValueBool());
case TypedValue::Type::Int:
return Value(value.ValueInt());
case TypedValue::Type::Double:
return Value(value.ValueDouble());
case TypedValue::Type::String:
return Value(std::string(value.ValueString()));
case TypedValue::Type::List: {
const auto &lst = value.ValueList();
std::vector<Value> dst;
dst.reserve(lst.size());
for (const auto &elem : lst) {
dst.push_back(TypedValueToValue(elem));
}
return Value(std::move(dst));
}
case TypedValue::Type::Map: {
const auto &value_map = value.ValueMap();
std::map<std::string, Value> dst;
for (const auto &[key, val] : value_map) {
dst[std::string(key)] = TypedValueToValue(val);
}
return Value(std::move(dst));
}
case TypedValue::Type::Vertex:
return Value(value.ValueVertex().GetVertex());
case TypedValue::Type::Edge:
return Value(value.ValueEdge().GetEdge());
case TypedValue::Type::Path:
default:
break;
}
throw std::runtime_error("Incorrect type in conversion");
}
} // namespace memgraph::query::v2

View File

@ -11,7 +11,6 @@
#include "query/v2/bindings/ast_visitor.hpp"
#include "query/v2/frontend/ast/ast.hpp"
#include "query/v2/procedure/module.hpp"
#include "utils/memory.hpp"
namespace memgraph::query::v2 {
@ -86,12 +85,7 @@ class PrivilegeExtractor : public QueryVisitor<void>, public HierarchicalTreeVis
AddPrivilege(AuthQuery::Privilege::CREATE);
return false;
}
bool PreVisit(CallProcedure &procedure) override {
const auto maybe_proc =
procedure::FindProcedure(procedure::gModuleRegistry, procedure.procedure_name_, utils::NewDeleteResource());
if (maybe_proc && maybe_proc->second->info.required_privilege) {
AddPrivilege(*maybe_proc->second->info.required_privilege);
}
bool PreVisit(CallProcedure & /*procedure*/) override {
return false;
}
bool PreVisit(Delete & /*unused*/) override {

View File

@ -21,11 +21,12 @@
#include <type_traits>
#include "query/v2/bindings/typed_value.hpp"
#include "query/v2/conversions.hpp"
#include "query/v2/db_accessor.hpp"
#include "query/v2/exceptions.hpp"
#include "query/v2/procedure/cypher_types.hpp"
#include "query/v2/procedure/mg_procedure_impl.hpp"
#include "query/v2/procedure/module.hpp"
//#include "query/v2/procedure/mg_procedure_impl.hpp"
//#include "query/v2/procedure/module.hpp"
#include "storage/v3/conversions.hpp"
#include "utils/string.hpp"
#include "utils/temporal.hpp"
@ -387,39 +388,7 @@ TypedValue Last(const TypedValue *args, int64_t nargs, const FunctionContext &ct
return TypedValue(list.back(), ctx.memory);
}
TypedValue Properties(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
FType<Or<Null, Vertex, Edge>>("properties", args, nargs);
auto *dba = ctx.db_accessor;
auto get_properties = [&](const auto &record_accessor) {
TypedValue::TMap properties(ctx.memory);
auto maybe_props = record_accessor.Properties(ctx.view);
if (maybe_props.HasError()) {
switch (maybe_props.GetError()) {
case storage::v3::Error::DELETED_OBJECT:
throw QueryRuntimeException("Trying to get properties from a deleted object.");
case storage::v3::Error::NONEXISTENT_OBJECT:
throw query::v2::QueryRuntimeException("Trying to get properties from an object that doesn't exist.");
case storage::v3::Error::SERIALIZATION_ERROR:
case storage::v3::Error::VERTEX_HAS_EDGES:
case storage::v3::Error::PROPERTIES_DISABLED:
throw QueryRuntimeException("Unexpected error when getting properties.");
}
}
for (const auto &property : *maybe_props) {
properties.emplace(dba->PropertyToName(property.first),
storage::v3::PropertyToTypedValue<TypedValue>(property.second));
}
return TypedValue(std::move(properties));
};
const auto &value = args[0];
if (value.IsNull()) {
return TypedValue(ctx.memory);
} else if (value.IsVertex()) {
return get_properties(value.ValueVertex());
} else {
return get_properties(value.ValueEdge());
}
}
TypedValue Properties(const TypedValue * /*args*/, int64_t /*nargs*/, const FunctionContext & /*ctx*/) { return {}; }
TypedValue Size(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
FType<Or<Null, List, String, Map, Path>>("size", args, nargs);
@ -435,7 +404,8 @@ TypedValue Size(const TypedValue *args, int64_t nargs, const FunctionContext &ct
// to do it.
return TypedValue(static_cast<int64_t>(value.ValueMap().size()), ctx.memory);
} else {
return TypedValue(static_cast<int64_t>(value.ValuePath().edges().size()), ctx.memory);
// TODO(kostasrim) Fix the dummy return
return TypedValue(int64_t(0), ctx.memory);
}
}
@ -469,25 +439,22 @@ TypedValue Degree(const TypedValue *args, int64_t nargs, const FunctionContext &
FType<Or<Null, Vertex>>("degree", args, nargs);
if (args[0].IsNull()) return TypedValue(ctx.memory);
const auto &vertex = args[0].ValueVertex();
size_t out_degree = UnwrapDegreeResult(vertex.OutDegree(ctx.view));
size_t in_degree = UnwrapDegreeResult(vertex.InDegree(ctx.view));
return TypedValue(static_cast<int64_t>(out_degree + in_degree), ctx.memory);
// TODO(kostasrim) Fix dummy values
return TypedValue(int64_t(0), ctx.memory);
}
TypedValue InDegree(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
FType<Or<Null, Vertex>>("inDegree", args, nargs);
if (args[0].IsNull()) return TypedValue(ctx.memory);
const auto &vertex = args[0].ValueVertex();
size_t in_degree = UnwrapDegreeResult(vertex.InDegree(ctx.view));
return TypedValue(static_cast<int64_t>(in_degree), ctx.memory);
return TypedValue(int64_t(0), ctx.memory);
}
TypedValue OutDegree(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
FType<Or<Null, Vertex>>("outDegree", args, nargs);
if (args[0].IsNull()) return TypedValue(ctx.memory);
const auto &vertex = args[0].ValueVertex();
size_t out_degree = UnwrapDegreeResult(vertex.OutDegree(ctx.view));
return TypedValue(static_cast<int64_t>(out_degree), ctx.memory);
return TypedValue(int64_t(0), ctx.memory);
}
TypedValue ToBoolean(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
@ -553,7 +520,7 @@ TypedValue Type(const TypedValue *args, int64_t nargs, const FunctionContext &ct
FType<Or<Null, Edge>>("type", args, nargs);
auto *dba = ctx.db_accessor;
if (args[0].IsNull()) return TypedValue(ctx.memory);
return TypedValue(dba->EdgeTypeToName(args[0].ValueEdge().EdgeType()), ctx.memory);
return TypedValue(static_cast<int64_t>(args[0].ValueEdge().EdgeType()), ctx.memory);
}
TypedValue ValueType(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
@ -593,81 +560,27 @@ TypedValue ValueType(const TypedValue *args, int64_t nargs, const FunctionContex
}
// TODO: How is Keys different from Properties function?
TypedValue Keys(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
TypedValue Keys(const TypedValue *args, int64_t nargs, const FunctionContext & /*ctx*/) {
FType<Or<Null, Vertex, Edge>>("keys", args, nargs);
auto *dba = ctx.db_accessor;
auto get_keys = [&](const auto &record_accessor) {
TypedValue::TVector keys(ctx.memory);
auto maybe_props = record_accessor.Properties(ctx.view);
if (maybe_props.HasError()) {
switch (maybe_props.GetError()) {
case storage::v3::Error::DELETED_OBJECT:
throw QueryRuntimeException("Trying to get keys from a deleted object.");
case storage::v3::Error::NONEXISTENT_OBJECT:
throw query::v2::QueryRuntimeException("Trying to get keys from an object that doesn't exist.");
case storage::v3::Error::SERIALIZATION_ERROR:
case storage::v3::Error::VERTEX_HAS_EDGES:
case storage::v3::Error::PROPERTIES_DISABLED:
throw QueryRuntimeException("Unexpected error when getting keys.");
}
}
for (const auto &property : *maybe_props) {
keys.emplace_back(dba->PropertyToName(property.first));
}
return TypedValue(std::move(keys));
};
const auto &value = args[0];
if (value.IsNull()) {
return TypedValue(ctx.memory);
} else if (value.IsVertex()) {
return get_keys(value.ValueVertex());
} else {
return get_keys(value.ValueEdge());
}
return {};
}
TypedValue Labels(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
FType<Or<Null, Vertex>>("labels", args, nargs);
auto *dba = ctx.db_accessor;
if (args[0].IsNull()) return TypedValue(ctx.memory);
TypedValue::TVector labels(ctx.memory);
auto maybe_labels = args[0].ValueVertex().Labels(ctx.view);
if (maybe_labels.HasError()) {
switch (maybe_labels.GetError()) {
case storage::v3::Error::DELETED_OBJECT:
throw QueryRuntimeException("Trying to get labels from a deleted node.");
case storage::v3::Error::NONEXISTENT_OBJECT:
throw query::v2::QueryRuntimeException("Trying to get labels from a node that doesn't exist.");
case storage::v3::Error::SERIALIZATION_ERROR:
case storage::v3::Error::VERTEX_HAS_EDGES:
case storage::v3::Error::PROPERTIES_DISABLED:
throw QueryRuntimeException("Unexpected error when getting labels.");
}
}
for (const auto &label : *maybe_labels) {
labels.emplace_back(dba->LabelToName(label));
}
return TypedValue(std::move(labels));
return {};
}
TypedValue Nodes(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
FType<Or<Null, Path>>("nodes", args, nargs);
if (args[0].IsNull()) return TypedValue(ctx.memory);
const auto &vertices = args[0].ValuePath().vertices();
TypedValue::TVector values(ctx.memory);
values.reserve(vertices.size());
for (const auto &v : vertices) values.emplace_back(v);
return TypedValue(std::move(values));
return {};
}
TypedValue Relationships(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
FType<Or<Null, Path>>("relationships", args, nargs);
if (args[0].IsNull()) return TypedValue(ctx.memory);
const auto &edges = args[0].ValuePath().edges();
TypedValue::TVector values(ctx.memory);
values.reserve(edges.size());
for (const auto &e : edges) values.emplace_back(e);
return TypedValue(std::move(values));
return {};
}
TypedValue Range(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
@ -877,9 +790,9 @@ TypedValue Id(const TypedValue *args, int64_t nargs, const FunctionContext &ctx)
if (arg.IsNull()) {
return TypedValue(ctx.memory);
} else if (arg.IsVertex()) {
return TypedValue(arg.ValueVertex().CypherId(), ctx.memory);
return TypedValue(int64_t(arg.ValueVertex().CypherId()), ctx.memory);
} else {
return TypedValue(arg.ValueEdge().CypherId(), ctx.memory);
return TypedValue(int64_t(arg.ValueEdge().CypherId()), ctx.memory);
}
}
@ -1180,49 +1093,6 @@ TypedValue Duration(const TypedValue *args, int64_t nargs, const FunctionContext
return TypedValue(utils::Duration(duration_parameters), ctx.memory);
}
std::function<TypedValue(const TypedValue *, const int64_t, const FunctionContext &)> UserFunction(
const mgp_func &func, const std::string &fully_qualified_name) {
return [func, fully_qualified_name](const TypedValue *args, int64_t nargs, const FunctionContext &ctx) -> TypedValue {
/// Find function is called to aquire the lock on Module pointer while user-defined function is executed
const auto &maybe_found =
procedure::FindFunction(procedure::gModuleRegistry, fully_qualified_name, utils::NewDeleteResource());
if (!maybe_found) {
throw QueryRuntimeException(
"Function '{}' has been unloaded. Please check query modules to confirm that function is loaded in Memgraph.",
fully_qualified_name);
}
const auto &func_cb = func.cb;
mgp_memory memory{ctx.memory};
mgp_func_context functx{ctx.db_accessor, ctx.view};
auto graph = mgp_graph::NonWritableGraph(*ctx.db_accessor, ctx.view);
std::vector<TypedValue> args_list;
args_list.reserve(nargs);
for (std::size_t i = 0; i < nargs; ++i) {
args_list.emplace_back(args[i]);
}
auto function_argument_list = mgp_list(ctx.memory);
procedure::ConstructArguments(args_list, func, fully_qualified_name, function_argument_list, graph);
mgp_func_result maybe_res;
func_cb(&function_argument_list, &functx, &maybe_res, &memory);
if (maybe_res.error_msg) {
throw QueryRuntimeException(*maybe_res.error_msg);
}
if (!maybe_res.value) {
throw QueryRuntimeException(
"Function '{}' didn't set the result nor the error message. Please either set the result by using "
"mgp_func_result_set_value or the error by using mgp_func_result_set_error_msg.",
fully_qualified_name);
}
return {*(maybe_res.value), ctx.memory};
};
}
} // namespace
std::function<TypedValue(const TypedValue *, int64_t, const FunctionContext &ctx)> NameToFunction(
@ -1309,14 +1179,6 @@ std::function<TypedValue(const TypedValue *, int64_t, const FunctionContext &ctx
if (function_name == "LOCALDATETIME") return LocalDateTime;
if (function_name == "DURATION") return Duration;
const auto &maybe_found =
procedure::FindFunction(procedure::gModuleRegistry, function_name, utils::NewDeleteResource());
if (maybe_found) {
const auto *func = (*maybe_found).second;
return UserFunction(*func, function_name);
}
return nullptr;
}

View File

@ -41,8 +41,6 @@
#include "query/v2/plan/planner.hpp"
#include "query/v2/plan/profile.hpp"
#include "query/v2/plan/vertex_count_cache.hpp"
#include "query/v2/stream/common.hpp"
#include "query/v2/trigger.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/storage.hpp"
@ -461,14 +459,6 @@ std::optional<std::string> StringPointerToOptional(const std::string *str) {
return str == nullptr ? std::nullopt : std::make_optional(*str);
}
stream::CommonStreamInfo GetCommonStreamInfo(StreamQuery *stream_query, ExpressionEvaluator &evaluator) {
return {
.batch_interval = GetOptionalValue<std::chrono::milliseconds>(stream_query->batch_interval_, evaluator)
.value_or(stream::kDefaultBatchInterval),
.batch_size = GetOptionalValue<int64_t>(stream_query->batch_size_, evaluator).value_or(stream::kDefaultBatchSize),
.transformation_name = stream_query->transform_name_};
}
std::vector<std::string> EvaluateTopicNames(ExpressionEvaluator &evaluator,
std::variant<Expression *, std::vector<std::string>> topic_variant) {
return std::visit(utils::Overloaded{[&](Expression *expression) {
@ -480,214 +470,6 @@ std::vector<std::string> EvaluateTopicNames(ExpressionEvaluator &evaluator,
std::move(topic_variant));
}
Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, ExpressionEvaluator &evaluator,
InterpreterContext *interpreter_context,
const std::string *username) {
static constexpr std::string_view kDefaultConsumerGroup = "mg_consumer";
std::string consumer_group{stream_query->consumer_group_.empty() ? kDefaultConsumerGroup
: stream_query->consumer_group_};
auto bootstrap = GetOptionalStringValue(stream_query->bootstrap_servers_, evaluator);
if (bootstrap && bootstrap->empty()) {
throw SemanticException("Bootstrap servers must not be an empty string!");
}
auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator);
const auto get_config_map = [&evaluator](std::unordered_map<Expression *, Expression *> map,
std::string_view map_name) -> std::unordered_map<std::string, std::string> {
std::unordered_map<std::string, std::string> config_map;
for (const auto [key_expr, value_expr] : map) {
const auto key = key_expr->Accept(evaluator);
const auto value = value_expr->Accept(evaluator);
if (!key.IsString() || !value.IsString()) {
throw SemanticException("{} must contain only string keys and values!", map_name);
}
config_map.emplace(key.ValueString(), value.ValueString());
}
return config_map;
};
return [interpreter_context, stream_name = stream_query->stream_name_,
topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_),
consumer_group = std::move(consumer_group), common_stream_info = std::move(common_stream_info),
bootstrap_servers = std::move(bootstrap), owner = StringPointerToOptional(username),
configs = get_config_map(stream_query->configs_, "Configs"),
credentials = get_config_map(stream_query->credentials_, "Credentials")]() mutable {
std::string bootstrap = bootstrap_servers
? std::move(*bootstrap_servers)
: std::string{interpreter_context->config.default_kafka_bootstrap_servers};
interpreter_context->streams.Create<query::v2::stream::KafkaStream>(stream_name,
{.common_info = std::move(common_stream_info),
.topics = std::move(topic_names),
.consumer_group = std::move(consumer_group),
.bootstrap_servers = std::move(bootstrap),
.configs = std::move(configs),
.credentials = std::move(credentials)},
std::move(owner));
return std::vector<std::vector<TypedValue>>{};
};
}
Callback::CallbackFunction GetPulsarCreateCallback(StreamQuery *stream_query, ExpressionEvaluator &evaluator,
InterpreterContext *interpreter_context,
const std::string *username) {
auto service_url = GetOptionalStringValue(stream_query->service_url_, evaluator);
if (service_url && service_url->empty()) {
throw SemanticException("Service URL must not be an empty string!");
}
auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator);
return [interpreter_context, stream_name = stream_query->stream_name_,
topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_),
common_stream_info = std::move(common_stream_info), service_url = std::move(service_url),
owner = StringPointerToOptional(username)]() mutable {
std::string url =
service_url ? std::move(*service_url) : std::string{interpreter_context->config.default_pulsar_service_url};
interpreter_context->streams.Create<query::v2::stream::PulsarStream>(
stream_name,
{.common_info = std::move(common_stream_info), .topics = std::move(topic_names), .service_url = std::move(url)},
std::move(owner));
return std::vector<std::vector<TypedValue>>{};
};
}
Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &parameters,
InterpreterContext *interpreter_context, DbAccessor *db_accessor,
const std::string *username, std::vector<Notification> *notifications) {
expr::Frame<TypedValue> frame(0);
SymbolTable symbol_table;
EvaluationContext evaluation_context;
// TODO: MemoryResource for EvaluationContext, it should probably be passed as
// the argument to Callback.
evaluation_context.timestamp = QueryTimestamp();
evaluation_context.parameters = parameters;
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
Callback callback;
switch (stream_query->action_) {
case StreamQuery::Action::CREATE_STREAM: {
EventCounter::IncrementCounter(EventCounter::StreamsCreated);
switch (stream_query->type_) {
case StreamQuery::Type::KAFKA:
callback.fn = GetKafkaCreateCallback(stream_query, evaluator, interpreter_context, username);
break;
case StreamQuery::Type::PULSAR:
callback.fn = GetPulsarCreateCallback(stream_query, evaluator, interpreter_context, username);
break;
}
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CREATE_STREAM,
fmt::format("Created stream {}.", stream_query->stream_name_));
return callback;
}
case StreamQuery::Action::START_STREAM: {
const auto batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator);
const auto timeout = GetOptionalValue<std::chrono::milliseconds>(stream_query->timeout_, evaluator);
if (batch_limit.has_value()) {
if (batch_limit.value() < 0) {
throw utils::BasicException("Parameter BATCH_LIMIT cannot hold negative value");
}
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_, batch_limit, timeout]() {
interpreter_context->streams.StartWithLimit(stream_name, static_cast<uint64_t>(batch_limit.value()), timeout);
return std::vector<std::vector<TypedValue>>{};
};
} else {
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_]() {
interpreter_context->streams.Start(stream_name);
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::START_STREAM,
fmt::format("Started stream {}.", stream_query->stream_name_));
}
return callback;
}
case StreamQuery::Action::START_ALL_STREAMS: {
callback.fn = [interpreter_context]() {
interpreter_context->streams.StartAll();
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::START_ALL_STREAMS, "Started all streams.");
return callback;
}
case StreamQuery::Action::STOP_STREAM: {
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_]() {
interpreter_context->streams.Stop(stream_name);
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::STOP_STREAM,
fmt::format("Stopped stream {}.", stream_query->stream_name_));
return callback;
}
case StreamQuery::Action::STOP_ALL_STREAMS: {
callback.fn = [interpreter_context]() {
interpreter_context->streams.StopAll();
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::STOP_ALL_STREAMS, "Stopped all streams.");
return callback;
}
case StreamQuery::Action::DROP_STREAM: {
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_]() {
interpreter_context->streams.Drop(stream_name);
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::DROP_STREAM,
fmt::format("Dropped stream {}.", stream_query->stream_name_));
return callback;
}
case StreamQuery::Action::SHOW_STREAMS: {
callback.header = {"name", "type", "batch_interval", "batch_size", "transformation_name", "owner", "is running"};
callback.fn = [interpreter_context]() {
auto streams_status = interpreter_context->streams.GetStreamInfo();
std::vector<std::vector<TypedValue>> results;
results.reserve(streams_status.size());
auto stream_info_as_typed_stream_info_emplace_in = [](auto &typed_status, const auto &stream_info) {
typed_status.emplace_back(stream_info.batch_interval.count());
typed_status.emplace_back(stream_info.batch_size);
typed_status.emplace_back(stream_info.transformation_name);
};
for (const auto &status : streams_status) {
std::vector<TypedValue> typed_status;
typed_status.reserve(7);
typed_status.emplace_back(status.name);
typed_status.emplace_back(StreamSourceTypeToString(status.type));
stream_info_as_typed_stream_info_emplace_in(typed_status, status.info);
if (status.owner.has_value()) {
typed_status.emplace_back(*status.owner);
} else {
typed_status.emplace_back();
}
typed_status.emplace_back(status.is_running);
results.push_back(std::move(typed_status));
}
return results;
};
return callback;
}
case StreamQuery::Action::CHECK_STREAM: {
callback.header = {"queries", "raw messages"};
const auto batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator);
if (batch_limit.has_value() && batch_limit.value() < 0) {
throw utils::BasicException("Parameter BATCH_LIMIT cannot hold negative value");
}
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_,
timeout = GetOptionalValue<std::chrono::milliseconds>(stream_query->timeout_, evaluator),
batch_limit]() mutable {
return interpreter_context->streams.Check(stream_name, timeout, batch_limit);
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CHECK_STREAM,
fmt::format("Checked stream {}.", stream_query->stream_name_));
return callback;
}
}
}
Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters &parameters, DbAccessor *db_accessor) {
expr::Frame<TypedValue> frame(0);
SymbolTable symbol_table;
@ -889,7 +671,7 @@ struct PullPlanVector {
struct PullPlan {
explicit PullPlan(std::shared_ptr<CachedPlan> plan, const Parameters &parameters, bool is_profile_query,
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
TriggerContextCollector *trigger_context_collector = nullptr,
// TriggerContextCollector *trigger_context_collector = nullptr,
std::optional<size_t> memory_limit = {});
std::optional<plan::ProfilingStatsWithTotalTime> Pull(AnyStream *stream, std::optional<int> n,
const std::vector<Symbol> &output_symbols,
@ -918,7 +700,8 @@ struct PullPlan {
PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &parameters, const bool is_profile_query,
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
TriggerContextCollector *trigger_context_collector, const std::optional<size_t> memory_limit)
const std::optional<size_t> memory_limit)
// TriggerContextCollector *trigger_context_collector, const std::optional<size_t> memory_limit)
: plan_(plan),
cursor_(plan->plan().MakeCursor(execution_memory)),
frame_(plan->symbol_table().max_position(), execution_memory),
@ -934,7 +717,7 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
}
ctx_.is_shutting_down = &interpreter_context->is_shutting_down;
ctx_.is_profile_query = is_profile_query;
ctx_.trigger_context_collector = trigger_context_collector;
// ctx_.trigger_context_collector = trigger_context_collector;
}
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n,
@ -1031,7 +814,9 @@ using RWType = plan::ReadWriteTypeChecker::RWType;
InterpreterContext::InterpreterContext(storage::v3::Shard *db, const InterpreterConfig config,
const std::filesystem::path &data_directory)
: db(db), trigger_store(data_directory / "triggers"), config(config), streams{this, data_directory / "streams"} {}
// : db(db), trigger_store(data_directory / "triggers"), config(config), streams{this, data_directory /
// "streams"} {}
: db(db), config(config) {}
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");
@ -1052,9 +837,9 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
interpreter_context_->db->Access(coordinator::Hlc{}, GetIsolationLevelOverride()));
execution_db_accessor_.emplace(db_accessor_.get());
if (interpreter_context_->trigger_store.HasTriggers()) {
trigger_context_collector_.emplace(interpreter_context_->trigger_store.GetEventTypes());
}
// if (interpreter_context_->trigger_store.HasTriggers()) {
// trigger_context_collector_.emplace(interpreter_context_->trigger_store.GetEventTypes());
// }
};
} else if (query_upper == "COMMIT") {
handler = [this] {
@ -1101,8 +886,8 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
InterpreterContext *interpreter_context, DbAccessor *dba,
utils::MemoryResource *execution_memory, std::vector<Notification> *notifications,
TriggerContextCollector *trigger_context_collector = nullptr) {
utils::MemoryResource *execution_memory, std::vector<Notification> *notifications) {
// TriggerContextCollector *trigger_context_collector = nullptr) {
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query);
expr::Frame<TypedValue> frame(0);
@ -1147,7 +932,8 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
utils::FindOr(parsed_query.stripped_query.named_expressions(), symbol.token_position(), symbol.name()).first);
}
auto pull_plan = std::make_shared<PullPlan>(plan, parsed_query.parameters, false, dba, interpreter_context,
execution_memory, trigger_context_collector, memory_limit);
execution_memory, memory_limit);
// execution_memory, trigger_context_collector, memory_limit);
return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges),
[pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary](
AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
@ -1272,7 +1058,7 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
// No output symbols are given so that nothing is streamed.
if (!stats_and_total_time) {
stats_and_total_time = PullPlan(plan, parameters, true, dba, interpreter_context,
execution_memory, nullptr, memory_limit)
execution_memory, memory_limit)
.Pull(stream, {}, {}, summary);
pull_plan = std::make_shared<PullPlanVector>(ProfilingStatsToTable(*stats_and_total_time));
}
@ -1489,170 +1275,6 @@ PreparedQuery PrepareFreeMemoryQuery(ParsedQuery parsed_query, const bool in_exp
RWType::NONE};
}
TriggerEventType ToTriggerEventType(const TriggerQuery::EventType event_type) {
switch (event_type) {
case TriggerQuery::EventType::ANY:
return TriggerEventType::ANY;
case TriggerQuery::EventType::CREATE:
return TriggerEventType::CREATE;
case TriggerQuery::EventType::VERTEX_CREATE:
return TriggerEventType::VERTEX_CREATE;
case TriggerQuery::EventType::EDGE_CREATE:
return TriggerEventType::EDGE_CREATE;
case TriggerQuery::EventType::DELETE:
return TriggerEventType::DELETE;
case TriggerQuery::EventType::VERTEX_DELETE:
return TriggerEventType::VERTEX_DELETE;
case TriggerQuery::EventType::EDGE_DELETE:
return TriggerEventType::EDGE_DELETE;
case TriggerQuery::EventType::UPDATE:
return TriggerEventType::UPDATE;
case TriggerQuery::EventType::VERTEX_UPDATE:
return TriggerEventType::VERTEX_UPDATE;
case TriggerQuery::EventType::EDGE_UPDATE:
return TriggerEventType::EDGE_UPDATE;
}
}
Callback CreateTrigger(TriggerQuery *trigger_query,
const std::map<std::string, storage::v3::PropertyValue> &user_parameters,
InterpreterContext *interpreter_context, DbAccessor *dba, std::optional<std::string> owner) {
return {
{},
[trigger_name = std::move(trigger_query->trigger_name_), trigger_statement = std::move(trigger_query->statement_),
event_type = trigger_query->event_type_, before_commit = trigger_query->before_commit_, interpreter_context, dba,
user_parameters, owner = std::move(owner)]() mutable -> std::vector<std::vector<TypedValue>> {
interpreter_context->trigger_store.AddTrigger(
std::move(trigger_name), trigger_statement, user_parameters, ToTriggerEventType(event_type),
before_commit ? TriggerPhase::BEFORE_COMMIT : TriggerPhase::AFTER_COMMIT, &interpreter_context->ast_cache,
dba, interpreter_context->config.query, std::move(owner), interpreter_context->auth_checker);
return {};
}};
}
Callback DropTrigger(TriggerQuery *trigger_query, InterpreterContext *interpreter_context) {
return {{},
[trigger_name = std::move(trigger_query->trigger_name_),
interpreter_context]() -> std::vector<std::vector<TypedValue>> {
interpreter_context->trigger_store.DropTrigger(trigger_name);
return {};
}};
}
Callback ShowTriggers(InterpreterContext *interpreter_context) {
return {{"trigger name", "statement", "event type", "phase", "owner"}, [interpreter_context] {
std::vector<std::vector<TypedValue>> results;
auto trigger_infos = interpreter_context->trigger_store.GetTriggerInfo();
results.reserve(trigger_infos.size());
for (auto &trigger_info : trigger_infos) {
std::vector<TypedValue> typed_trigger_info;
typed_trigger_info.reserve(4);
typed_trigger_info.emplace_back(std::move(trigger_info.name));
typed_trigger_info.emplace_back(std::move(trigger_info.statement));
typed_trigger_info.emplace_back(TriggerEventTypeToString(trigger_info.event_type));
typed_trigger_info.emplace_back(trigger_info.phase == TriggerPhase::BEFORE_COMMIT ? "BEFORE COMMIT"
: "AFTER COMMIT");
typed_trigger_info.emplace_back(trigger_info.owner.has_value() ? TypedValue{*trigger_info.owner}
: TypedValue{});
results.push_back(std::move(typed_trigger_info));
}
return results;
}};
}
PreparedQuery PrepareTriggerQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
DbAccessor *dba,
const std::map<std::string, storage::v3::PropertyValue> &user_parameters,
const std::string *username) {
if (in_explicit_transaction) {
throw TriggerModificationInMulticommandTxException();
}
auto *trigger_query = utils::Downcast<TriggerQuery>(parsed_query.query);
MG_ASSERT(trigger_query);
std::optional<Notification> trigger_notification;
auto callback = std::invoke([trigger_query, interpreter_context, dba, &user_parameters,
owner = StringPointerToOptional(username), &trigger_notification]() mutable {
switch (trigger_query->action_) {
case TriggerQuery::Action::CREATE_TRIGGER:
trigger_notification.emplace(SeverityLevel::INFO, NotificationCode::CREATE_TRIGGER,
fmt::format("Created trigger {}.", trigger_query->trigger_name_));
EventCounter::IncrementCounter(EventCounter::TriggersCreated);
return CreateTrigger(trigger_query, user_parameters, interpreter_context, dba, std::move(owner));
case TriggerQuery::Action::DROP_TRIGGER:
trigger_notification.emplace(SeverityLevel::INFO, NotificationCode::DROP_TRIGGER,
fmt::format("Dropped trigger {}.", trigger_query->trigger_name_));
return DropTrigger(trigger_query, interpreter_context);
case TriggerQuery::Action::SHOW_TRIGGERS:
return ShowTriggers(interpreter_context);
}
});
return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges),
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr},
trigger_notification = std::move(trigger_notification), notifications](
AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
if (UNLIKELY(!pull_plan)) {
pull_plan = std::make_shared<PullPlanVector>(callback_fn());
}
if (pull_plan->Pull(stream, n)) {
if (trigger_notification) {
notifications->push_back(std::move(*trigger_notification));
}
return QueryHandlerResult::COMMIT;
}
return std::nullopt;
},
RWType::NONE};
// False positive report for the std::make_shared above
// NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks)
}
PreparedQuery PrepareStreamQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
DbAccessor *dba,
const std::map<std::string, storage::v3::PropertyValue> & /*user_parameters*/,
const std::string *username) {
if (in_explicit_transaction) {
throw StreamQueryInMulticommandTxException();
}
auto *stream_query = utils::Downcast<StreamQuery>(parsed_query.query);
MG_ASSERT(stream_query);
auto callback =
HandleStreamQuery(stream_query, parsed_query.parameters, interpreter_context, dba, username, notifications);
return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges),
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
if (UNLIKELY(!pull_plan)) {
pull_plan = std::make_shared<PullPlanVector>(callback_fn());
}
if (pull_plan->Pull(stream, n)) {
return QueryHandlerResult::COMMIT;
}
return std::nullopt;
},
RWType::NONE};
// False positive report for the std::make_shared above
// NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks)
}
constexpr auto ToStorageIsolationLevel(const IsolationLevelQuery::IsolationLevel isolation_level) noexcept {
switch (isolation_level) {
case IsolationLevelQuery::IsolationLevel::SNAPSHOT_ISOLATION:
@ -1912,10 +1534,6 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
db_accessor_ = std::make_unique<storage::v3::Shard::Accessor>(
interpreter_context_->db->Access(coordinator::Hlc{}, GetIsolationLevelOverride()));
execution_db_accessor_.emplace(db_accessor_.get());
if (utils::Downcast<CypherQuery>(parsed_query.query) && interpreter_context_->trigger_store.HasTriggers()) {
trigger_context_collector_.emplace(interpreter_context_->trigger_store.GetEventTypes());
}
}
utils::Timer planning_timer;
@ -1924,8 +1542,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
if (utils::Downcast<CypherQuery>(parsed_query.query)) {
prepared_query = PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
&*execution_db_accessor_, &query_execution->execution_memory,
&query_execution->notifications,
trigger_context_collector_ ? &*trigger_context_collector_ : nullptr);
&query_execution->notifications);
} else if (utils::Downcast<ExplainQuery>(parsed_query.query)) {
prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
&*execution_db_accessor_, &query_execution->execution_memory_with_exception);
@ -1960,13 +1577,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
} else if (utils::Downcast<FreeMemoryQuery>(parsed_query.query)) {
prepared_query = PrepareFreeMemoryQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_);
} else if (utils::Downcast<TriggerQuery>(parsed_query.query)) {
prepared_query =
PrepareTriggerQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications,
interpreter_context_, &*execution_db_accessor_, params, username);
throw std::runtime_error("Unimplemented");
} else if (utils::Downcast<StreamQuery>(parsed_query.query)) {
prepared_query =
PrepareStreamQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications,
interpreter_context_, &*execution_db_accessor_, params, username);
throw std::runtime_error("unimplemented");
} else if (utils::Downcast<IsolationLevelQuery>(parsed_query.query)) {
prepared_query =
PrepareIsolationLevelQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_, this);
@ -2007,35 +1620,8 @@ void Interpreter::Abort() {
db_accessor_->Abort();
execution_db_accessor_.reset();
db_accessor_.reset();
trigger_context_collector_.reset();
}
namespace {
void RunTriggersIndividually(const utils::SkipList<Trigger> &triggers, InterpreterContext *interpreter_context,
TriggerContext trigger_context) {
// Run the triggers
for (const auto &trigger : triggers.access()) {
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
// create a new transaction for each trigger
auto storage_acc = interpreter_context->db->Access(coordinator::Hlc{});
DbAccessor db_accessor{&storage_acc};
trigger_context.AdaptForAccessor(&db_accessor);
try {
trigger.Execute(&db_accessor, &execution_memory, interpreter_context->config.execution_timeout_sec,
&interpreter_context->is_shutting_down, trigger_context, interpreter_context->auth_checker);
} catch (const utils::BasicException &exception) {
spdlog::warn("Trigger '{}' failed with exception:\n{}", trigger.Name(), exception.what());
db_accessor.Abort();
continue;
}
db_accessor.Commit();
}
}
} // namespace
void Interpreter::Commit() {
// It's possible that some queries did not finish because the user did
// not pull all of the results from the query.
@ -2044,49 +1630,12 @@ void Interpreter::Commit() {
// a query.
if (!db_accessor_) return;
std::optional<TriggerContext> trigger_context = std::nullopt;
if (trigger_context_collector_) {
trigger_context.emplace(std::move(*trigger_context_collector_).TransformToTriggerContext());
trigger_context_collector_.reset();
}
if (trigger_context) {
// Run the triggers
for (const auto &trigger : interpreter_context_->trigger_store.BeforeCommitTriggers().access()) {
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
AdvanceCommand();
try {
trigger.Execute(&*execution_db_accessor_, &execution_memory, interpreter_context_->config.execution_timeout_sec,
&interpreter_context_->is_shutting_down, *trigger_context, interpreter_context_->auth_checker);
} catch (const utils::BasicException &e) {
throw utils::BasicException(
fmt::format("Trigger '{}' caused the transaction to fail.\nException: {}", trigger.Name(), e.what()));
}
}
SPDLOG_DEBUG("Finished executing before commit triggers");
}
const auto reset_necessary_members = [this]() {
execution_db_accessor_.reset();
db_accessor_.reset();
trigger_context_collector_.reset();
};
db_accessor_->Commit(coordinator::Hlc{});
// The ordered execution of after commit triggers is heavily depending on the exclusiveness of db_accessor_->Commit():
// only one of the transactions can be commiting at the same time, so when the commit is finished, that transaction
// probably will schedule its after commit triggers, because the other transactions that want to commit are still
// waiting for commiting or one of them just started commiting its changes.
// This means the ordered execution of after commit triggers are not guaranteed.
if (trigger_context && interpreter_context_->trigger_store.AfterCommitTriggers().size() > 0) {
interpreter_context_->after_commit_trigger_pool.AddTask(
[trigger_context = std::move(*trigger_context), interpreter_context = this->interpreter_context_,
user_transaction = std::shared_ptr(std::move(db_accessor_))]() mutable {
RunTriggersIndividually(interpreter_context->trigger_store.AfterCommitTriggers(), interpreter_context,
std::move(trigger_context));
SPDLOG_DEBUG("Finished executing after commit triggers"); // NOLINT(bugprone-lambda-function-name)
});
}
reset_necessary_members();

View File

@ -27,8 +27,6 @@
#include "query/v2/plan/operator.hpp"
#include "query/v2/plan/read_write_type_checker.hpp"
#include "query/v2/stream.hpp"
#include "query/v2/stream/streams.hpp"
#include "query/v2/trigger.hpp"
#include "storage/v3/isolation_level.hpp"
#include "storage/v3/name_id_mapper.hpp"
#include "utils/event_counter.hpp"
@ -180,13 +178,8 @@ struct InterpreterContext {
utils::SkipList<QueryCacheEntry> ast_cache;
utils::SkipList<PlanCacheEntry> plan_cache;
TriggerStore trigger_store;
utils::ThreadPool after_commit_trigger_pool{1};
const InterpreterConfig config;
query::v2::stream::Streams streams;
storage::v3::LabelId NameToLabelId(std::string_view label_name) {
return storage::v3::LabelId::FromUint(query_id_mapper.NameToId(label_name));
}
@ -334,7 +327,6 @@ class Interpreter final {
// move this unique_ptr into a shrared_ptr.
std::unique_ptr<storage::v3::Shard::Accessor> db_accessor_;
std::optional<DbAccessor> execution_db_accessor_;
std::optional<TriggerContextCollector> trigger_context_collector_;
bool in_explicit_transaction_{false};
bool expect_rollback_{false};

File diff suppressed because it is too large Load Diff

View File

@ -25,15 +25,20 @@
#include "query/v2/common.hpp"
#include "query/v2/frontend/ast/ast.hpp"
#include "expr/semantic/symbol.hpp"
#include "query/v2//bindings/typed_value.hpp"
#include "query/v2//bindings/frame.hpp"
#include "query/v2//bindings/symbol_table.hpp"
#include "query/v2/bindings/typed_value.hpp"
#include "query/v2/bindings/frame.hpp"
#include "query/v2/bindings/symbol_table.hpp"
#include "storage/v3/id_types.hpp"
#include "utils/bound.hpp"
#include "utils/fnv.hpp"
#include "utils/memory.hpp"
#include "utils/visitor.hpp"
#include "utils/logging.hpp"
#include "query/v2/accessors.hpp"
using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor;
using EdgeAccessor = memgraph::query::v2::accessors::EdgeAccessor;
using Path = memgraph::query::v2::accessors::Path;
cpp<#
(lcp:namespace memgraph)

View File

@ -272,6 +272,7 @@ class RuleBasedPlanner {
PropertiesMapList vector_props;
vector_props.reserve(node_properties->size());
for (const auto &kv : *node_properties) {
// TODO(kostasrim) GetProperty should be implemented in terms of ShardRequestManager NameToProperty
vector_props.push_back({GetProperty(kv.first), kv.second});
}
return std::move(vector_props);

View File

@ -14,6 +14,7 @@
#include <chrono>
#include <iostream>
#include <map>
#include <memory>
#include <optional>
#include <unordered_map>
#include <utility>
@ -33,32 +34,45 @@ struct Value;
struct Label {
LabelId id;
friend bool operator==(const Label &lhs, const Label &rhs) { return lhs.id == rhs.id; }
};
// TODO(kostasrim) update this with CompoundKey, same for the rest of the file.
using PrimaryKey = std::vector<Value>;
using VertexId = std::pair<Label, PrimaryKey>;
inline bool operator==(const VertexId &lhs, const VertexId &rhs) {
return (lhs.first == rhs.first) && (lhs.second == rhs.second);
}
using Gid = size_t;
using PropertyId = memgraph::storage::v3::PropertyId;
struct EdgeType {
uint64_t id;
friend bool operator==(const EdgeType &lhs, const EdgeType &rhs) = default;
};
struct EdgeId {
Gid gid;
};
struct Vertex {
VertexId id;
std::vector<Label> labels;
friend bool operator==(const Vertex &lhs, const Vertex &rhs) {
return (lhs.id == rhs.id) && (lhs.labels == rhs.labels);
}
};
struct Edge {
VertexId src;
VertexId dst;
EdgeId id;
EdgeType type;
};
struct Vertex {
VertexId id;
std::vector<Label> labels;
friend bool operator==(const Edge &lhs, const Edge &rhs) {
return (lhs.src == rhs.src) && (lhs.dst == rhs.dst) && (lhs.type == rhs.type);
}
};
struct PathPart {
@ -81,6 +95,7 @@ struct Value {
explicit Value(const double val) : type(Type::Double), double_v(val) {}
explicit Value(const Vertex val) : type(Type::Vertex), vertex_v(val) {}
explicit Value(const Edge val) : type(Type::Edge), edge_v(val) {}
explicit Value(const std::string &val) : type(Type::String) { new (&string_v) std::string(val); }
explicit Value(const char *val) : type(Type::String) { new (&string_v) std::string(val); }
@ -300,6 +315,34 @@ struct Value {
Edge edge_v;
Path path_v;
};
friend bool operator==(const Value &lhs, const Value &rhs) {
if (lhs.type != rhs.type) {
return false;
}
switch (lhs.type) {
case Value::Type::Null:
return true;
case Value::Type::Bool:
return lhs.bool_v == rhs.bool_v;
case Value::Type::Int64:
return lhs.int_v == rhs.int_v;
case Value::Type::Double:
return lhs.double_v == rhs.double_v;
case Value::Type::String:
return lhs.string_v == rhs.string_v;
case Value::Type::List:
return lhs.list_v == rhs.list_v;
case Value::Type::Map:
return lhs.map_v == rhs.map_v;
case Value::Type::Vertex:
return lhs.vertex_v == rhs.vertex_v;
case Value::Type::Edge:
return lhs.edge_v == rhs.edge_v;
case Value::Type::Path:
return true;
}
}
};
struct ValuesMap {
@ -343,9 +386,9 @@ struct ScanVerticesRequest {
};
struct ScanResultRow {
Value vertex;
Vertex vertex;
// empty() is no properties returned
std::map<PropertyId, Value> props;
std::vector<std::pair<PropertyId, Value>> props;
};
struct ScanVerticesResponse {
@ -374,6 +417,11 @@ struct GetPropertiesResponse {
enum class EdgeDirection : uint8_t { OUT = 1, IN = 2, BOTH = 3 };
struct VertexEdgeId {
VertexId vertex_id;
std::optional<EdgeId> next_id;
};
struct ExpandOneRequest {
Hlc transaction_id;
std::vector<VertexId> src_vertices;
@ -437,7 +485,14 @@ struct NewVertex {
std::vector<std::pair<PropertyId, Value>> properties;
};
struct NewVertexLabel {
std::string label;
PrimaryKey primary_key;
std::vector<std::pair<PropertyId, Value>> properties;
};
struct CreateVerticesRequest {
std::string label;
Hlc transaction_id;
std::vector<NewVertex> new_vertices;
};

View File

@ -0,0 +1,405 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <deque>
#include <iostream>
#include <map>
#include <optional>
#include <random>
#include <set>
#include <stdexcept>
#include <thread>
#include <unordered_map>
#include <vector>
#include "coordinator/coordinator_client.hpp"
#include "coordinator/coordinator_rsm.hpp"
#include "coordinator/shard_map.hpp"
#include "io/address.hpp"
#include "io/errors.hpp"
#include "io/rsm/raft.hpp"
#include "io/rsm/rsm_client.hpp"
#include "io/rsm/shard_rsm.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "query/v2/accessors.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/value_conversions.hpp"
#include "utils/result.hpp"
namespace memgraph::msgs {
template <typename TStorageClient>
class RsmStorageClientManager {
public:
using CompoundKey = memgraph::io::rsm::ShardRsmKey;
using Shard = memgraph::coordinator::Shard;
using LabelId = memgraph::storage::v3::LabelId;
RsmStorageClientManager() = default;
RsmStorageClientManager(const RsmStorageClientManager &) = delete;
RsmStorageClientManager(RsmStorageClientManager &&) = delete;
RsmStorageClientManager &operator=(const RsmStorageClientManager &) = delete;
RsmStorageClientManager &operator=(RsmStorageClientManager &&) = delete;
~RsmStorageClientManager() = default;
void AddClient(const LabelId label_id, Shard key, TStorageClient client) {
cli_cache_[label_id].insert({std::move(key), std::move(client)});
}
bool Exists(const LabelId label_id, const Shard &key) { return cli_cache_[label_id].contains(key); }
void PurgeCache() { cli_cache_.clear(); }
TStorageClient &GetClient(const LabelId label_id, const Shard &key) { return cli_cache_[label_id].find(key)->second; }
private:
std::map<LabelId, std::map<Shard, TStorageClient>> cli_cache_;
};
template <typename TRequest>
struct ExecutionState {
using CompoundKey = memgraph::io::rsm::ShardRsmKey;
using Shard = memgraph::coordinator::Shard;
enum State : int8_t { INITIALIZING, EXECUTING, COMPLETED };
// label is optional because some operators can create/remove etc, vertices. These kind of requests contain the label
// on the request itself.
std::optional<std::string> label;
// CompoundKey is optional because some operators require to iterate over all the available keys
// of a shard. One example is ScanAll, where we only require the field label.
std::optional<CompoundKey> key;
// Transaction id to be filled by the ShardRequestManager implementation
memgraph::coordinator::Hlc transaction_id;
// Initialized by ShardRequestManager implementation. This vector is filled with the shards that
// the ShardRequestManager impl will send requests to. When a request to a shard exhausts it, meaning that
// it pulled all the requested data from the given Shard, it will be removed from the Vector. When the Vector becomes
// empty, it means that all of the requests have completed succefully.
std::vector<Shard> shard_cache;
// 1-1 mapping with `shard_cache`.
// A vector that tracks request metatdata for each shard (For example, next_id for a ScanAll on Shard A)
std::vector<TRequest> requests;
State state = INITIALIZING;
};
class ShardRequestManagerInterface {
public:
using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor;
ShardRequestManagerInterface() = default;
ShardRequestManagerInterface(const ShardRequestManagerInterface &) = delete;
ShardRequestManagerInterface(ShardRequestManagerInterface &&) = delete;
ShardRequestManagerInterface &operator=(const ShardRequestManagerInterface &) = delete;
ShardRequestManagerInterface &&operator=(ShardRequestManagerInterface &&) = delete;
virtual ~ShardRequestManagerInterface() = default;
virtual void StartTransaction() = 0;
virtual std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) = 0;
virtual std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state,
std::vector<NewVertex> new_vertices) = 0;
virtual std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) = 0;
virtual memgraph::storage::v3::PropertyId NameToProperty(const std::string &name) const = 0;
virtual memgraph::storage::v3::LabelId LabelNameToLabelId(const std::string &name) const = 0;
virtual bool IsPrimaryKey(PropertyId name) const = 0;
};
// TODO(kostasrim)rename this class template
template <typename TTransport>
class ShardRequestManager : public ShardRequestManagerInterface {
public:
using WriteRequests = CreateVerticesRequest;
using WriteResponses = CreateVerticesResponse;
using ReadRequests = std::variant<ScanVerticesRequest, ExpandOneRequest>;
using ReadResponses = std::variant<ScanVerticesResponse, ExpandOneResponse>;
using StorageClient =
memgraph::coordinator::RsmClient<TTransport, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
using CoordinatorClient = memgraph::coordinator::CoordinatorClient<TTransport>;
using Address = memgraph::io::Address;
using Shard = memgraph::coordinator::Shard;
using ShardMap = memgraph::coordinator::ShardMap;
using CompoundKey = memgraph::coordinator::PrimaryKey;
using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor;
ShardRequestManager(CoordinatorClient coord, memgraph::io::Io<TTransport> &&io)
: coord_cli_(std::move(coord)), io_(std::move(io)) {}
ShardRequestManager(const ShardRequestManager &) = delete;
ShardRequestManager(ShardRequestManager &&) = delete;
ShardRequestManager &operator=(const ShardRequestManager &) = delete;
ShardRequestManager &operator=(ShardRequestManager &&) = delete;
~ShardRequestManager() override {}
void StartTransaction() override {
memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
auto write_res = coord_cli_.SendWriteRequest(req);
if (write_res.HasError()) {
throw std::runtime_error("HLC request failed");
}
auto coordinator_write_response = write_res.GetValue();
auto hlc_response = std::get<memgraph::coordinator::HlcResponse>(coordinator_write_response);
// Transaction ID to be used later...
transaction_id_ = hlc_response.new_hlc;
if (hlc_response.fresher_shard_map) {
shards_map_ = hlc_response.fresher_shard_map.value();
}
}
memgraph::storage::v3::PropertyId NameToProperty(const std::string &name) const override {
return *shards_map_.GetPropertyId(name);
}
memgraph::storage::v3::LabelId LabelNameToLabelId(const std::string &name) const override {
return shards_map_.GetLabelId(name);
}
bool IsPrimaryKey(const PropertyId name) const override {
return std::find_if(shards_map_.properties.begin(), shards_map_.properties.end(),
[name](auto &pr) { return pr.second == name; }) != shards_map_.properties.end();
}
// TODO(kostasrim) Simplify return result
std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) override {
MaybeInitializeExecutionState(state);
std::vector<ScanVerticesResponse> responses;
auto &shard_cache_ref = state.shard_cache;
size_t id = 0;
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
auto &storage_client = GetStorageClientForShard(
*state.label, storage::conversions::ConvertPropertyVector(state.requests[id].start_id.second));
// TODO(kostasrim) Currently requests return the result directly. Adjust this when the API works MgFuture
// instead.
auto read_response_result = storage_client.SendReadRequest(state.requests[id]);
// RETRY on timeouts?
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test
if (read_response_result.HasError()) {
throw std::runtime_error("ScanAll request timedout");
}
auto &response = std::get<ScanVerticesResponse>(read_response_result.GetValue());
if (!response.success) {
throw std::runtime_error("ScanAll request did not succeed");
}
if (!response.next_start_id) {
shard_it = shard_cache_ref.erase(shard_it);
} else {
state.requests[id].start_id.second = response.next_start_id->second;
++shard_it;
}
responses.push_back(std::move(response));
}
// We are done with this state
MaybeCompleteState(state);
// TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return
// result of storage_client.SendReadRequest()).
return PostProcess(std::move(responses));
}
std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state,
std::vector<NewVertex> new_vertices) override {
MG_ASSERT(!new_vertices.empty());
MaybeInitializeExecutionState(state, new_vertices);
std::vector<CreateVerticesResponse> responses;
auto &shard_cache_ref = state.shard_cache;
size_t id = 0;
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
// This is fine because all new_vertices of each request end up on the same shard
const auto labels = state.requests[id].new_vertices[0].label_ids;
auto primary_key = state.requests[id].new_vertices[0].primary_key;
auto &storage_client = GetStorageClientForShard(*shard_it, labels[0].id);
auto write_response_result = storage_client.SendWriteRequest(state.requests[id]);
// RETRY on timeouts?
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test
if (write_response_result.HasError()) {
throw std::runtime_error("CreateVertices request timedout");
}
if (!write_response_result.GetValue().success) {
throw std::runtime_error("CreateVertices request did not succeed");
}
responses.push_back(write_response_result.GetValue());
shard_it = shard_cache_ref.erase(shard_it);
}
// We are done with this state
MaybeCompleteState(state);
// TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return
// result of storage_client.SendReadRequest()).
return responses;
}
std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) override {
// TODO(kostasrim)Update to limit the batch size here
// Expansions of the destination must be handled by the caller. For example
// match (u:L1 { prop : 1 })-[:Friend]-(v:L1)
// For each vertex U, the ExpandOne will result in <U, Edges>. The destination vertex and its properties
// must be fetched again with an ExpandOne(Edges.dst)
MaybeInitializeExecutionState(state);
std::vector<ExpandOneResponse> responses;
auto &shard_cache_ref = state.shard_cache;
size_t id = 0;
// pending_requests on shards
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
const Label primary_label = state.requests[id].src_vertices[0].first;
auto &storage_client = GetStorageClientForShard(*shard_it, primary_label.id);
auto read_response_result = storage_client.SendReadRequest(state.requests[id]);
// RETRY on timeouts?
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map
if (read_response_result.HasError()) {
throw std::runtime_error("ExpandOne request timedout");
}
auto &response = std::get<ExpandOneResponse>(read_response_result.GetValue());
responses.push_back(std::move(response));
}
return responses;
}
private:
std::vector<VertexAccessor> PostProcess(std::vector<ScanVerticesResponse> &&responses) const {
std::vector<VertexAccessor> accessors;
for (auto &response : responses) {
for (auto &result_row : response.results) {
accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props)));
}
}
return accessors;
}
template <typename ExecutionState>
void ThrowIfStateCompleted(ExecutionState &state) const {
if (state.state == ExecutionState::COMPLETED) [[unlikely]] {
throw std::runtime_error("State is completed and must be reset");
}
}
template <typename ExecutionState>
void MaybeCompleteState(ExecutionState &state) const {
if (state.requests.empty()) {
state.state = ExecutionState::COMPLETED;
}
}
template <typename ExecutionState>
bool ShallNotInitializeState(ExecutionState &state) const {
return state.state != ExecutionState::INITIALIZING;
}
void MaybeInitializeExecutionState(ExecutionState<CreateVerticesRequest> &state,
std::vector<NewVertex> new_vertices) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
}
state.transaction_id = transaction_id_;
std::map<Shard, CreateVerticesRequest> per_shard_request_table;
for (auto &new_vertex : new_vertices) {
auto shard = shards_map_.GetShardForKey(new_vertex.label_ids[0].id,
storage::conversions::ConvertPropertyVector(new_vertex.primary_key));
if (!per_shard_request_table.contains(shard)) {
CreateVerticesRequest create_v_rqst{.transaction_id = transaction_id_};
per_shard_request_table.insert(std::pair(shard, std::move(create_v_rqst)));
state.shard_cache.push_back(shard);
}
per_shard_request_table[shard].new_vertices.push_back(std::move(new_vertex));
}
for (auto &[shard, rqst] : per_shard_request_table) {
state.requests.push_back(std::move(rqst));
}
state.state = ExecutionState<CreateVerticesRequest>::EXECUTING;
}
void MaybeInitializeExecutionState(ExecutionState<ScanVerticesRequest> &state) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
}
state.transaction_id = transaction_id_;
auto shards = shards_map_.GetShards(*state.label);
for (auto &[key, shard] : shards) {
state.shard_cache.push_back(std::move(shard));
ScanVerticesRequest rqst;
rqst.transaction_id = transaction_id_;
rqst.start_id.second = storage::conversions::ConvertValueVector(key);
state.requests.push_back(std::move(rqst));
}
state.state = ExecutionState<ScanVerticesRequest>::EXECUTING;
}
void MaybeInitializeExecutionState(ExecutionState<ExpandOneRequest> &state) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
}
state.transaction_id = transaction_id_;
std::map<Shard, ExpandOneRequest> per_shard_request_table;
MG_ASSERT(state.requests.size() == 1);
auto top_level_rqst = std::move(*state.requests.begin());
auto top_level_rqst_template = top_level_rqst;
top_level_rqst_template.src_vertices.clear();
top_level_rqst_template.edge_types.clear();
state.requests.clear();
size_t id = 0;
for (const auto &vertex : top_level_rqst.src_vertices) {
auto shard =
shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second));
if (!per_shard_request_table.contains(shard)) {
ExpandOneRequest expand_v_rqst = top_level_rqst_template;
per_shard_request_table.insert(std::pair(shard, std::move(expand_v_rqst)));
state.shard_cache.push_back(shard);
}
per_shard_request_table[shard].src_vertices.push_back(vertex);
per_shard_request_table[shard].edge_types.push_back(top_level_rqst.edge_types[id]);
++id;
}
for (auto &[shard, rqst] : per_shard_request_table) {
state.requests.push_back(std::move(rqst));
}
state.state = ExecutionState<ExpandOneRequest>::EXECUTING;
}
StorageClient &GetStorageClientForShard(Shard shard, LabelId label_id) {
if (!storage_cli_manager_.Exists(label_id, shard)) {
AddStorageClientToManager(shard, label_id);
}
return storage_cli_manager_.GetClient(label_id, shard);
}
StorageClient &GetStorageClientForShard(const std::string &label, const CompoundKey &key) {
auto shard = shards_map_.GetShardForKey(label, key);
auto label_id = shards_map_.GetLabelId(label);
return GetStorageClientForShard(std::move(shard), label_id);
}
void AddStorageClientToManager(Shard target_shard, const LabelId &label_id) {
MG_ASSERT(!target_shard.empty());
auto leader_addr = target_shard.front();
std::vector<Address> addresses;
addresses.reserve(target_shard.size());
for (auto &address : target_shard) {
addresses.push_back(std::move(address.address));
}
auto cli = StorageClient(io_, std::move(leader_addr.address), std::move(addresses));
storage_cli_manager_.AddClient(label_id, target_shard, std::move(cli));
}
ShardMap shards_map_;
CoordinatorClient coord_cli_;
RsmStorageClientManager<StorageClient> storage_cli_manager_;
memgraph::io::Io<TTransport> io_;
memgraph::coordinator::Hlc transaction_id_;
// TODO(kostasrim) Add batch prefetching
};
} // namespace memgraph::msgs

View File

@ -10,6 +10,7 @@
// licenses/APL.txt.
#include "expr/typed_value.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/property_value.hpp"
#include "utils/memory.hpp"
@ -165,4 +166,40 @@ storage::v3::PropertyValue TypedToPropertyValue(const TTypedValue &value) {
}
throw expr::TypedValueException("Unsupported conversion from TTypedValue to PropertyValue");
}
template <typename TTypedValue>
msgs::Value TypedValueToValue(const TTypedValue &value) {
using Value = msgs::Value;
switch (value.type()) {
case TTypedValue::Type::Null:
return {};
case TTypedValue::Type::Bool:
return Value(value.ValueBool());
case TTypedValue::Type::Int:
return Value(value.ValueInt());
case TTypedValue::Type::Double:
return Value(value.ValueDouble());
case TTypedValue::Type::String:
return Value(std::string(value.ValueString()));
case TTypedValue::Type::List: {
const auto &src = value.ValueList();
std::vector<msgs::Value> dst;
dst.reserve(src.size());
std::transform(src.begin(), src.end(), std::back_inserter(dst),
[](const auto &val) { return TypedValueToValue(val); });
return Value(std::move(dst));
}
case TTypedValue::Type::Map: {
const auto &src = value.ValueMap();
std::map<std::string, Value> dst;
for (const auto &elem : src) {
dst.insert({std::string(elem.first), TypedValueToValue(elem.second)});
}
return Value(std::move(dst));
}
default:
break;
}
throw expr::TypedValueException("Unsupported conversion from TTypedValue to PropertyValue");
}
} // namespace memgraph::storage::v3

View File

@ -14,6 +14,7 @@
#include "query/v2/requests.hpp"
#include "storage/v3/shard_rsm.hpp"
#include "storage/v3/value_conversions.hpp"
#include "storage/v3/vertex_accessor.hpp"
using memgraph::msgs::Label;
@ -21,83 +22,12 @@ using memgraph::msgs::PropertyId;
using memgraph::msgs::Value;
using memgraph::msgs::VertexId;
using memgraph::storage::conversions::ConvertPropertyVector;
using memgraph::storage::conversions::ConvertValueVector;
using memgraph::storage::conversions::ToPropertyValue;
using memgraph::storage::conversions::ToValue;
namespace {
// TODO(gvolfing use come algorithm instead of explicit for loops)
memgraph::storage::v3::PropertyValue ToPropertyValue(Value &&value) {
using PV = memgraph::storage::v3::PropertyValue;
PV ret;
switch (value.type) {
case Value::Type::Null:
return PV{};
case Value::Type::Bool:
return PV(value.bool_v);
case Value::Type::Int64:
return PV(static_cast<int64_t>(value.int_v));
case Value::Type::Double:
return PV(value.double_v);
case Value::Type::String:
return PV(value.string_v);
case Value::Type::List: {
std::vector<PV> list;
for (auto &elem : value.list_v) {
list.emplace_back(ToPropertyValue(std::move(elem)));
}
return PV(list);
}
case Value::Type::Map: {
std::map<std::string, PV> map;
for (auto &[key, value] : value.map_v) {
map.emplace(std::make_pair(key, ToPropertyValue(std::move(value))));
}
return PV(map);
}
// These are not PropertyValues
case Value::Type::Vertex:
case Value::Type::Edge:
case Value::Type::Path:
MG_ASSERT(false, "Not PropertyValue");
}
return ret;
}
Value ToValue(const memgraph::storage::v3::PropertyValue &pv) {
using memgraph::storage::v3::PropertyValue;
switch (pv.type()) {
case PropertyValue::Type::Bool:
return Value(pv.ValueBool());
case PropertyValue::Type::Double:
return Value(pv.ValueDouble());
case PropertyValue::Type::Int:
return Value(pv.ValueInt());
case PropertyValue::Type::List: {
std::vector<Value> list(pv.ValueList().size());
for (const auto &elem : pv.ValueList()) {
list.emplace_back(ToValue(elem));
}
return Value(list);
}
case PropertyValue::Type::Map: {
std::map<std::string, Value> map;
for (const auto &[key, val] : pv.ValueMap()) {
// maybe use std::make_pair once the && issue is resolved.
map.emplace(key, ToValue(val));
}
return Value(map);
}
case PropertyValue::Type::Null:
return Value{};
case PropertyValue::Type::String:
return Value(pv.ValueString());
case PropertyValue::Type::TemporalData: {
// TBD -> we need to specify this in the messages, not a priority.
MG_ASSERT(false, "Temporal datatypes are not yet implemented on Value!");
return Value{};
}
}
}
std::vector<std::pair<memgraph::storage::v3::PropertyId, memgraph::storage::v3::PropertyValue>> ConvertPropertyMap(
std::vector<std::pair<PropertyId, Value>> &&properties) {
@ -111,23 +41,13 @@ std::vector<std::pair<memgraph::storage::v3::PropertyId, memgraph::storage::v3::
return ret;
}
std::vector<memgraph::storage::v3::PropertyValue> ConvertPropertyVector(std::vector<Value> &&vec) {
std::vector<memgraph::storage::v3::PropertyValue> ret;
ret.reserve(vec.size());
std::vector<std::pair<memgraph::storage::v3::PropertyId, Value>> FromMap(
const std::map<PropertyId, Value> &properties) {
std::vector<std::pair<memgraph::storage::v3::PropertyId, Value>> ret;
ret.reserve(properties.size());
for (auto &elem : vec) {
ret.push_back(ToPropertyValue(std::move(elem)));
}
return ret;
}
std::vector<Value> ConvertValueVector(const std::vector<memgraph::storage::v3::PropertyValue> &vec) {
std::vector<Value> ret;
ret.reserve(vec.size());
for (const auto &elem : vec) {
ret.push_back(ToValue(elem));
for (const auto &[key, value] : properties) {
ret.emplace_back(std::make_pair(key, value));
}
return ret;
@ -338,10 +258,12 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
bool did_reach_starting_point = false;
uint64_t sample_counter = 0;
const auto start_ids = ConvertPropertyVector(std::move(req.start_id.second));
for (auto it = vertex_iterable.begin(); it != vertex_iterable.end(); ++it) {
const auto &vertex = *it;
if (ConvertPropertyVector(std::move(req.start_id.second)) == vertex.PrimaryKey(View(req.storage_view)).GetValue()) {
if (start_ids == vertex.PrimaryKey(View(req.storage_view)).GetValue()) {
did_reach_starting_point = true;
}
@ -358,8 +280,8 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
continue;
}
results.emplace_back(
msgs::ScanResultRow{.vertex = ConstructValueVertex(vertex, view), .props = found_props.value()});
results.emplace_back(msgs::ScanResultRow{.vertex = ConstructValueVertex(vertex, view).vertex_v,
.props = FromMap(found_props.value())});
++sample_counter;
if (sample_counter == req.batch_limit) {

View File

@ -0,0 +1,131 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "query/v2/requests.hpp"
#include "storage/v3/property_value.hpp"
#include "utils/logging.hpp"
#include <map>
#include <string>
#include <utility>
#include <vector>
#pragma once
// TODO(kostasrim) Think about long term sustainability
// This should not be put under v3 because ADL will mess that up.
namespace memgraph::storage::conversions {
using memgraph::msgs::PropertyId;
using memgraph::msgs::Value;
using memgraph::msgs::VertexId;
// TODO(gvolfing use come algorithm instead of explicit for loops)
inline memgraph::storage::v3::PropertyValue ToPropertyValue(Value value) {
using PV = memgraph::storage::v3::PropertyValue;
PV ret;
switch (value.type) {
case Value::Type::Null:
return PV{};
case Value::Type::Bool:
return PV(value.bool_v);
case Value::Type::Int64:
return PV(static_cast<int64_t>(value.int_v));
case Value::Type::Double:
return PV(value.double_v);
case Value::Type::String:
return PV(value.string_v);
case Value::Type::List: {
std::vector<PV> list;
for (auto &elem : value.list_v) {
list.emplace_back(ToPropertyValue(std::move(elem)));
}
return PV(list);
}
case Value::Type::Map: {
std::map<std::string, PV> map;
for (auto &[key, value] : value.map_v) {
map.emplace(std::make_pair(key, ToPropertyValue(std::move(value))));
}
return PV(map);
}
// These are not PropertyValues
case Value::Type::Vertex:
case Value::Type::Edge:
case Value::Type::Path:
MG_ASSERT(false, "Not PropertyValue");
}
return ret;
}
inline Value ToValue(const memgraph::storage::v3::PropertyValue &pv) {
using memgraph::storage::v3::PropertyValue;
switch (pv.type()) {
case PropertyValue::Type::Bool:
return Value(pv.ValueBool());
case PropertyValue::Type::Double:
return Value(pv.ValueDouble());
case PropertyValue::Type::Int:
return Value(pv.ValueInt());
case PropertyValue::Type::List: {
std::vector<Value> list;
list.reserve(pv.ValueList().size());
for (const auto &elem : pv.ValueList()) {
list.emplace_back(ToValue(elem));
}
return Value(list);
}
case PropertyValue::Type::Map: {
std::map<std::string, Value> map;
for (const auto &[key, val] : pv.ValueMap()) {
// maybe use std::make_pair once the && issue is resolved.
map.emplace(key, ToValue(val));
}
return Value(map);
}
case PropertyValue::Type::Null:
return Value{};
case PropertyValue::Type::String:
return Value(pv.ValueString());
case PropertyValue::Type::TemporalData: {
// TBD -> we need to specify this in the messages, not a priority.
MG_ASSERT(false, "Temporal datatypes are not yet implemented on Value!");
return Value{};
}
}
}
inline std::vector<memgraph::storage::v3::PropertyValue> ConvertPropertyVector(std::vector<Value> vec) {
std::vector<memgraph::storage::v3::PropertyValue> ret;
ret.reserve(vec.size());
for (auto &elem : vec) {
ret.push_back(ToPropertyValue(std::move(elem)));
}
return ret;
}
inline std::vector<Value> ConvertValueVector(const std::vector<memgraph::storage::v3::PropertyValue> &vec) {
std::vector<Value> ret;
ret.reserve(vec.size());
for (const auto &elem : vec) {
ret.push_back(ToValue(elem));
}
return ret;
}
} // namespace memgraph::storage::conversions

View File

@ -16,15 +16,21 @@ function(add_simulation_test test_cpp)
# used to help create two targets of the same name even though CMake
# requires unique logical target names
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
target_link_libraries(${target_name} mg-storage-v3 mg-communication gtest gmock mg-utils mg-io mg-io-simulator mg-coordinator Boost::headers)
# sanitize
target_compile_options(${target_name} PRIVATE -fsanitize=${san})
target_link_options(${target_name} PRIVATE -fsanitize=${san})
target_link_libraries(${target_name} mg-storage-v3 mg-communication gtest gmock mg-utils mg-io mg-io-simulator mg-coordinator Boost::headers mg-query-v2)
# register test
add_test(${target_name} ${exec_name})
add_dependencies(memgraph__simulation ${target_name})
endfunction(add_simulation_test)
add_simulation_test(basic_request.cpp)
add_simulation_test(raft.cpp)
add_simulation_test(trial_query_storage/query_storage_test.cpp)
add_simulation_test(sharded_map.cpp)
add_simulation_test(basic_request.cpp address)
add_simulation_test(raft.cpp address)
add_simulation_test(trial_query_storage/query_storage_test.cpp address)
add_simulation_test(sharded_map.cpp address)
add_simulation_test(shard_request_manager.cpp address)
add_simulation_test(shard_rsm.cpp)

126
tests/simulation/common.hpp Normal file
View File

@ -0,0 +1,126 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
/// The ShardRsm is a simple in-memory raft-backed kv store that can be used for simple testing
/// and implementation of some query engine logic before storage engines are fully implemented.
///
/// To implement multiple read and write commands, change the StorageRead* and StorageWrite* requests
/// and responses to a std::variant of the different options, and route them to specific handlers in
/// the ShardRsm's Read and Apply methods. Remember that Read is called immediately when the Raft
/// leader receives the request, and does not replicate anything over Raft. Apply is called only
/// AFTER the StorageWriteRequest is replicated to a majority of Raft peers, and the result of calling
/// ShardRsm::Apply(StorageWriteRequest) is returned to the client that submitted the request.
#include <algorithm>
#include <deque>
#include <iostream>
#include <map>
#include <optional>
#include <set>
#include <thread>
#include <vector>
#include <iostream>
#include "coordinator/hybrid_logical_clock.hpp"
#include "io/address.hpp"
#include "io/rsm/raft.hpp"
#include "io/rsm/shard_rsm.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/value_conversions.hpp"
#include "utils/logging.hpp"
using memgraph::coordinator::Hlc;
using memgraph::io::rsm::StorageWriteRequest;
using memgraph::io::rsm::StorageWriteResponse;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::msgs::CreateVerticesRequest;
using memgraph::msgs::CreateVerticesResponse;
using memgraph::msgs::ExpandOneRequest;
using memgraph::msgs::ExpandOneResponse;
using memgraph::msgs::ListedValues;
using memgraph::msgs::ScanVerticesRequest;
using memgraph::msgs::ScanVerticesResponse;
using memgraph::msgs::Value;
using memgraph::msgs::VertexId;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::PropertyValue;
using ShardRsmKey = std::vector<memgraph::storage::v3::PropertyValue>;
class MockedShardRsm {
std::map<ShardRsmKey, int> state_;
ShardRsmKey minimum_key_;
std::optional<ShardRsmKey> maximum_key_{std::nullopt};
Hlc shard_map_version_;
// The key is not located in this shard
bool IsKeyInRange(const ShardRsmKey &key) {
if (maximum_key_) [[likely]] {
return (key >= minimum_key_ && key <= maximum_key_);
}
return key >= minimum_key_;
}
public:
// ExpandOneResponse Read(ExpandOneRequest rqst);
// GetPropertiesResponse Read(GetPropertiesRequest rqst);
ScanVerticesResponse ReadImpl(ScanVerticesRequest rqst) {
ScanVerticesResponse ret;
auto as_prop_val = memgraph::storage::conversions::ConvertPropertyVector(rqst.start_id.second);
if (!IsKeyInRange(as_prop_val)) {
ret.success = false;
} else if (as_prop_val == ShardRsmKey{PropertyValue(0), PropertyValue(0)}) {
Value val(int64_t(0));
ret.next_start_id = std::make_optional<VertexId>();
ret.next_start_id->second =
memgraph::storage::conversions::ConvertValueVector(ShardRsmKey{PropertyValue(1), PropertyValue(0)});
memgraph::msgs::ScanResultRow result;
result.props.push_back(std::make_pair(memgraph::msgs::PropertyId::FromUint(0), val));
ret.results.push_back(std::move(result));
ret.success = true;
} else if (as_prop_val == ShardRsmKey{PropertyValue(1), PropertyValue(0)}) {
memgraph::msgs::ScanResultRow result;
Value val(int64_t(1));
result.props.push_back(std::make_pair(memgraph::msgs::PropertyId::FromUint(0), val));
ret.results.push_back(std::move(result));
ret.success = true;
} else if (as_prop_val == ShardRsmKey{PropertyValue(12), PropertyValue(13)}) {
memgraph::msgs::ScanResultRow result;
Value val(int64_t(444));
result.props.push_back(std::make_pair(memgraph::msgs::PropertyId::FromUint(0), val));
ret.results.push_back(std::move(result));
ret.success = true;
} else {
ret.success = false;
}
return ret;
}
ExpandOneResponse ReadImpl(ExpandOneRequest rqst) { return {}; }
using ReadRequests = std::variant<ScanVerticesRequest, ExpandOneRequest>;
using ReadResponses = std::variant<ScanVerticesResponse, ExpandOneResponse>;
ReadResponses Read(ReadRequests read_requests) {
return {std::visit([this](auto &&request) { return ReadResponses{ReadImpl(std::move(request))}; },
std::move(read_requests))};
}
CreateVerticesResponse Apply(CreateVerticesRequest request) { return CreateVerticesResponse{.success = true}; }
};

View File

@ -0,0 +1,320 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <chrono>
#include <deque>
#include <iostream>
#include <map>
#include <optional>
#include <set>
#include <thread>
#include <vector>
#include "common.hpp"
#include "common/types.hpp"
#include "coordinator/coordinator_client.hpp"
#include "coordinator/coordinator_rsm.hpp"
#include "io/address.hpp"
#include "io/errors.hpp"
#include "io/rsm/raft.hpp"
#include "io/rsm/rsm_client.hpp"
#include "io/rsm/shard_rsm.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "query/v2/accessors.hpp"
#include "query/v2/conversions.hpp"
#include "query/v2/requests.hpp"
#include "query/v2/shard_request_manager.hpp"
#include "storage/v3/property_value.hpp"
#include "utils/result.hpp"
using memgraph::coordinator::AddressAndStatus;
using CompoundKey = memgraph::coordinator::PrimaryKey;
using memgraph::coordinator::Coordinator;
using memgraph::coordinator::CoordinatorClient;
using memgraph::coordinator::CoordinatorRsm;
using memgraph::coordinator::HlcRequest;
using memgraph::coordinator::HlcResponse;
using memgraph::coordinator::Shard;
using memgraph::coordinator::ShardMap;
using memgraph::coordinator::Shards;
using memgraph::coordinator::Status;
using memgraph::io::Address;
using memgraph::io::Io;
using memgraph::io::ResponseEnvelope;
using memgraph::io::ResponseFuture;
using memgraph::io::Time;
using memgraph::io::TimedOut;
using memgraph::io::rsm::Raft;
using memgraph::io::rsm::ReadRequest;
using memgraph::io::rsm::ReadResponse;
using memgraph::io::rsm::StorageReadRequest;
using memgraph::io::rsm::StorageReadResponse;
using memgraph::io::rsm::StorageWriteRequest;
using memgraph::io::rsm::StorageWriteResponse;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::msgs::CreateVerticesRequest;
using memgraph::msgs::CreateVerticesResponse;
using memgraph::msgs::ListedValues;
using memgraph::msgs::NewVertexLabel;
using memgraph::msgs::ScanVerticesRequest;
using memgraph::msgs::ScanVerticesResponse;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::SchemaProperty;
using memgraph::utils::BasicResult;
namespace {
ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::coordinator::Address a_io_2,
memgraph::coordinator::Address a_io_3, memgraph::coordinator::Address b_io_1,
memgraph::coordinator::Address b_io_2, memgraph::coordinator::Address b_io_3) {
static const std::string label_name = std::string("test_label");
ShardMap sm;
// register new properties
const std::vector<std::string> property_names = {"property_1", "property_2"};
const auto properties = sm.AllocatePropertyIds(property_names);
const auto property_id_1 = properties.at("property_1");
const auto property_id_2 = properties.at("property_2");
const auto type_1 = memgraph::common::SchemaType::INT;
const auto type_2 = memgraph::common::SchemaType::INT;
// register new label space
std::vector<SchemaProperty> schema = {
SchemaProperty{.property_id = property_id_1, .type = type_1},
SchemaProperty{.property_id = property_id_2, .type = type_2},
};
auto label_success = sm.InitializeNewLabel(label_name, schema, 1, sm.shard_map_version);
MG_ASSERT(label_success);
const LabelId label_id = sm.labels.at(label_name);
auto &label_space = sm.label_spaces.at(label_id);
Shards &shards_for_label = label_space.shards;
// add first shard at [0, 0]
AddressAndStatus aas1_1{.address = a_io_1, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas1_2{.address = a_io_2, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas1_3{.address = a_io_3, .status = Status::CONSENSUS_PARTICIPANT};
Shard shard1 = {aas1_1, aas1_2, aas1_3};
auto key1 = memgraph::storage::v3::PropertyValue(0);
auto key2 = memgraph::storage::v3::PropertyValue(0);
CompoundKey compound_key_1 = {key1, key2};
shards_for_label[compound_key_1] = shard1;
// add second shard at [12, 13]
AddressAndStatus aas2_1{.address = b_io_1, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas2_2{.address = b_io_2, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas2_3{.address = b_io_3, .status = Status::CONSENSUS_PARTICIPANT};
Shard shard2 = {aas2_1, aas2_2, aas2_3};
auto key3 = memgraph::storage::v3::PropertyValue(12);
auto key4 = memgraph::storage::v3::PropertyValue(13);
CompoundKey compound_key_2 = {key3, key4};
shards_for_label[compound_key_2] = shard2;
return sm;
}
} // namespace
using WriteRequests = CreateVerticesRequest;
using WriteResponses = CreateVerticesResponse;
using ReadRequests = std::variant<ScanVerticesRequest, ExpandOneRequest>;
using ReadResponses = std::variant<ScanVerticesResponse, ExpandOneResponse>;
using ConcreteCoordinatorRsm = CoordinatorRsm<SimulatorTransport>;
using ConcreteStorageRsm =
Raft<SimulatorTransport, MockedShardRsm, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
template <typename IoImpl>
void RunStorageRaft(Raft<IoImpl, MockedShardRsm, WriteRequests, WriteResponses, ReadRequests, ReadResponses> server) {
server.Run();
}
template <typename ShardRequestManager>
void TestScanAll(ShardRequestManager &io) {
memgraph::msgs::ExecutionState<ScanVerticesRequest> state{.label = "test_label"};
auto result = io.Request(state);
MG_ASSERT(result.size() == 2);
{
auto prop = result[0].GetProperty(memgraph::msgs::PropertyId::FromUint(0));
MG_ASSERT(prop.int_v == 0);
prop = result[1].GetProperty(memgraph::msgs::PropertyId::FromUint(0));
MG_ASSERT(prop.int_v == 444);
}
result = io.Request(state);
{
MG_ASSERT(result.size() == 1);
auto prop = result[0].GetProperty(memgraph::msgs::PropertyId::FromUint(0));
MG_ASSERT(prop.int_v == 1);
}
// Exhaust it, request should be empty
result = io.Request(state);
MG_ASSERT(result.size() == 0);
}
template <typename ShardRequestManager>
void TestCreateVertices(ShardRequestManager &io) {
using PropVal = memgraph::msgs::Value;
memgraph::msgs::ExecutionState<CreateVerticesRequest> state;
std::vector<memgraph::msgs::NewVertex> new_vertices;
auto label_id = io.LabelNameToLabelId("test_label");
memgraph::msgs::NewVertex a1{.primary_key = {PropVal(int64_t(1)), PropVal(int64_t(0))}};
a1.label_ids.push_back({label_id});
memgraph::msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}};
a2.label_ids.push_back({label_id});
new_vertices.push_back(std::move(a1));
new_vertices.push_back(std::move(a2));
auto result = io.Request(state, std::move(new_vertices));
MG_ASSERT(result.size() == 2);
}
template <typename ShardRequestManager>
void TestExpand(ShardRequestManager &io) {}
template <typename ShardRequestManager>
void TestAggregate(ShardRequestManager &io) {}
int main() {
SimulatorConfig config{
.drop_percent = 0,
.perform_timeouts = false,
.scramble_messages = false,
.rng_seed = 0,
.start_time = Time::min() + std::chrono::microseconds{256 * 1024},
.abort_time = Time::min() + std::chrono::microseconds{2 * 8 * 1024 * 1024},
};
auto simulator = Simulator(config);
const auto one_second = std::chrono::seconds(1);
Io<SimulatorTransport> cli_io = simulator.RegisterNew();
cli_io.SetDefaultTimeout(one_second);
// Register
Io<SimulatorTransport> a_io_1 = simulator.RegisterNew();
a_io_1.SetDefaultTimeout(one_second);
Io<SimulatorTransport> a_io_2 = simulator.RegisterNew();
a_io_2.SetDefaultTimeout(one_second);
Io<SimulatorTransport> a_io_3 = simulator.RegisterNew();
a_io_3.SetDefaultTimeout(one_second);
Io<SimulatorTransport> b_io_1 = simulator.RegisterNew();
b_io_1.SetDefaultTimeout(one_second);
Io<SimulatorTransport> b_io_2 = simulator.RegisterNew();
b_io_2.SetDefaultTimeout(one_second);
Io<SimulatorTransport> b_io_3 = simulator.RegisterNew();
b_io_3.SetDefaultTimeout(one_second);
// Preconfigure coordinator with kv shard 'A' and 'B'
auto sm1 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
b_io_2.GetAddress(), b_io_3.GetAddress());
auto sm2 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
b_io_2.GetAddress(), b_io_3.GetAddress());
auto sm3 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
b_io_2.GetAddress(), b_io_3.GetAddress());
// Spin up shard A
std::vector<Address> a_addrs = {a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress()};
std::vector<Address> a_1_peers = {a_addrs[1], a_addrs[2]};
std::vector<Address> a_2_peers = {a_addrs[0], a_addrs[2]};
std::vector<Address> a_3_peers = {a_addrs[0], a_addrs[1]};
ConcreteStorageRsm a_1{std::move(a_io_1), a_1_peers, MockedShardRsm{}};
ConcreteStorageRsm a_2{std::move(a_io_2), a_2_peers, MockedShardRsm{}};
ConcreteStorageRsm a_3{std::move(a_io_3), a_3_peers, MockedShardRsm{}};
auto a_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_1));
simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[0]);
auto a_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_2));
simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[1]);
auto a_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_3));
simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[2]);
// Spin up shard B
std::vector<Address> b_addrs = {b_io_1.GetAddress(), b_io_2.GetAddress(), b_io_3.GetAddress()};
std::vector<Address> b_1_peers = {b_addrs[1], b_addrs[2]};
std::vector<Address> b_2_peers = {b_addrs[0], b_addrs[2]};
std::vector<Address> b_3_peers = {b_addrs[0], b_addrs[1]};
ConcreteStorageRsm b_1{std::move(b_io_1), b_1_peers, MockedShardRsm{}};
ConcreteStorageRsm b_2{std::move(b_io_2), b_2_peers, MockedShardRsm{}};
ConcreteStorageRsm b_3{std::move(b_io_3), b_3_peers, MockedShardRsm{}};
auto b_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_1));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[0]);
auto b_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_2));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[1]);
auto b_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_3));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[2]);
// Spin up coordinators
Io<SimulatorTransport> c_io_1 = simulator.RegisterNew();
c_io_1.SetDefaultTimeout(one_second);
Io<SimulatorTransport> c_io_2 = simulator.RegisterNew();
c_io_2.SetDefaultTimeout(one_second);
Io<SimulatorTransport> c_io_3 = simulator.RegisterNew();
c_io_3.SetDefaultTimeout(one_second);
std::vector<Address> c_addrs = {c_io_1.GetAddress(), c_io_2.GetAddress(), c_io_3.GetAddress()};
std::vector<Address> c_1_peers = {c_addrs[1], c_addrs[2]};
std::vector<Address> c_2_peers = {c_addrs[0], c_addrs[2]};
std::vector<Address> c_3_peers = {c_addrs[0], c_addrs[1]};
ConcreteCoordinatorRsm c_1{std::move(c_io_1), c_1_peers, Coordinator{(sm1)}};
ConcreteCoordinatorRsm c_2{std::move(c_io_2), c_2_peers, Coordinator{(sm2)}};
ConcreteCoordinatorRsm c_3{std::move(c_io_3), c_3_peers, Coordinator{(sm3)}};
auto c_thread_1 = std::jthread([c_1]() mutable { c_1.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[0]);
auto c_thread_2 = std::jthread([c_2]() mutable { c_2.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[1]);
auto c_thread_3 = std::jthread([c_3]() mutable { c_3.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[2]);
std::cout << "beginning test after servers have become quiescent" << std::endl;
// Have client contact coordinator RSM for a new transaction ID and
// also get the current shard map
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs);
memgraph::msgs::ShardRequestManager<SimulatorTransport> io(std::move(coordinator_client), std::move(cli_io));
io.StartTransaction();
TestScanAll(io);
TestCreateVertices(io);
simulator.ShutDown();
return 0;
}

View File

@ -331,6 +331,41 @@ target_link_libraries(${test_prefix}storage_v3_expr mg-storage-v3 mg-expr)
add_unit_test(storage_v3_schema.cpp)
target_link_libraries(${test_prefix}storage_v3_schema mg-storage-v3)
# Test mg-query-v2
# These are commented out because of the new TypedValue in the query engine
#add_unit_test(query_v2_interpreter.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp)
#target_link_libraries(${test_prefix}query_v2_interpreter mg-storage-v3 mg-query-v2 mg-communication)
#
#add_unit_test(query_v2_query_plan_accumulate_aggregate.cpp)
#target_link_libraries(${test_prefix}query_v2_query_plan_accumulate_aggregate mg-query-v2)
#
#add_unit_test(query_v2_query_plan_create_set_remove_delete.cpp)
#target_link_libraries(${test_prefix}query_v2_query_plan_create_set_remove_delete mg-query-v2 mg-expr)
#
#add_unit_test(query_v2_query_plan_bag_semantics.cpp)
#target_link_libraries(${test_prefix}query_v2_query_plan_bag_semantics mg-query-v2)
#
#add_unit_test(query_v2_query_plan_edge_cases.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp)
#target_link_libraries(${test_prefix}query_v2_query_plan_edge_cases mg-communication mg-query-v2)
#
#add_unit_test(query_v2_query_plan_v2_create_set_remove_delete.cpp)
#target_link_libraries(${test_prefix}query_v2_query_plan_v2_create_set_remove_delete mg-query-v2)
#
#add_unit_test(query_v2_query_plan_match_filter_return.cpp)
#target_link_libraries(${test_prefix}query_v2_query_plan_match_filter_return mg-query-v2)
#
#add_unit_test(query_v2_cypher_main_visitor.cpp)
#target_link_libraries(${test_prefix}query_v2_cypher_main_visitor mg-query-v2)
#
#add_unit_test(query_v2_query_required_privileges.cpp)
#target_link_libraries(${test_prefix}query_v2_query_required_privileges mg-query-v2)
#
#add_unit_test(replication_persistence_helper.cpp)
#target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2)
add_unit_test(query_v2_dummy_test.cpp)
target_link_libraries(${test_prefix}query_v2_dummy_test mg-query-v2)
add_unit_test(storage_v3_property_store.cpp)
target_link_libraries(${test_prefix}storage_v3_property_store mg-storage-v3 fmt)
@ -349,33 +384,6 @@ target_link_libraries(${test_prefix}storage_v3_edge mg-storage-v3)
add_unit_test(storage_v3_isolation_level.cpp)
target_link_libraries(${test_prefix}storage_v3_isolation_level mg-storage-v3)
# Test mg-query-v2
add_unit_test(query_v2_interpreter.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp)
target_link_libraries(${test_prefix}query_v2_interpreter mg-storage-v3 mg-query-v2 mg-communication)
# add_unit_test(query_v2_query_plan_accumulate_aggregate.cpp)
# target_link_libraries(${test_prefix}query_v2_query_plan_accumulate_aggregate mg-query-v2)
# # add_unit_test(query_v2_query_plan_create_set_remove_delete.cpp)
# # target_link_libraries(${test_prefix}query_v2_query_plan_create_set_remove_delete mg-query-v2 mg-expr)
# add_unit_test(query_v2_query_plan_bag_semantics.cpp)
# target_link_libraries(${test_prefix}query_v2_query_plan_bag_semantics mg-query-v2)
# add_unit_test(query_v2_query_plan_edge_cases.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp)
# target_link_libraries(${test_prefix}query_v2_query_plan_edge_cases mg-communication mg-query-v2)
# add_unit_test(query_v2_query_plan_v2_create_set_remove_delete.cpp)
# target_link_libraries(${test_prefix}query_v2_query_plan_v2_create_set_remove_delete mg-query-v2)
# add_unit_test(query_v2_query_plan_match_filter_return.cpp)
# target_link_libraries(${test_prefix}query_v2_query_plan_match_filter_return mg-query-v2)
# add_unit_test(query_v2_cypher_main_visitor.cpp)
# target_link_libraries(${test_prefix}query_v2_cypher_main_visitor mg-query-v2)
# add_unit_test(query_v2_query_required_privileges.cpp)
# target_link_libraries(${test_prefix}query_v2_query_required_privileges mg-query-v2)
add_unit_test(replication_persistence_helper.cpp)
target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2)

View File

@ -0,0 +1,36 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <iterator>
#include <memory>
#include <variant>
#include <vector>
#include "common/types.hpp"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "query/v2/bindings/frame.hpp"
#include "query/v2/bindings/typed_value.hpp"
#include "query/v2/context.hpp"
#include "query/v2/db_accessor.hpp"
#include "query/v2/exceptions.hpp"
#include "query/v2/plan/operator.hpp"
#include "query/v2/plan/operator.hpp"
#include "query_v2_query_plan_common.hpp"
class Dummy : public testing::Test {
protected:
void SetUp() override {}
};
TEST_F(Dummy, DummyTest) { ASSERT_EQ(true, true); }

View File

@ -130,8 +130,8 @@ namespace memgraph::storage::v3::test {
class ExpressionEvaluatorTest : public ::testing::Test {
protected:
LabelId primary_label{LabelId::FromInt(0)};
PropertyId primary_property{PropertyId::FromInt(1)};
LabelId primary_label{LabelId::FromInt(1)};
PropertyId primary_property{PropertyId::FromInt(2)};
PrimaryKey min_pk{PropertyValue(0)};
Shard db{primary_label, min_pk, std::nullopt};