[🍍 < T1086-MG] Test distributed operators e2e (#607)
* Fix Explain queries * Add Vertex/Edge accessor support for properties * Fix projections * Fix expansions to fetch destination vertex properties * Fix improper use of ShardMap on bolt and replaced it with the ShardRequestManager * Add NameToId mappers on ShardRequestManager * Add e2e tests for operators * Fix OPTIONAL MATCH
This commit is contained in:
parent
eafccaea84
commit
59c7d81ae8
9
.github/workflows/diff.yaml
vendored
9
.github/workflows/diff.yaml
vendored
@ -219,3 +219,12 @@ jobs:
|
|||||||
# Run simulation tests.
|
# Run simulation tests.
|
||||||
cd build
|
cd build
|
||||||
ctest -R memgraph__simulation --output-on-failure -j$THREADS
|
ctest -R memgraph__simulation --output-on-failure -j$THREADS
|
||||||
|
|
||||||
|
- name: Run e2e tests
|
||||||
|
run: |
|
||||||
|
# TODO(gitbuda): Setup mgclient and pymgclient properly.
|
||||||
|
cd tests
|
||||||
|
./setup.sh
|
||||||
|
source ve3/bin/activate
|
||||||
|
cd e2e
|
||||||
|
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../libs/mgclient/lib python runner.py --workloads-root-directory ./distributed_queries
|
||||||
|
@ -9,6 +9,7 @@
|
|||||||
// by the Apache License, Version 2.0, included in the file
|
// by the Apache License, Version 2.0, included in the file
|
||||||
// licenses/APL.txt.
|
// licenses/APL.txt.
|
||||||
|
|
||||||
|
#include <optional>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
@ -365,7 +366,6 @@ std::optional<LabelId> ShardMap::GetLabelId(const std::string &label) const {
|
|||||||
if (const auto it = labels.find(label); it != labels.end()) {
|
if (const auto it = labels.find(label); it != labels.end()) {
|
||||||
return it->second;
|
return it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -382,7 +382,6 @@ std::optional<PropertyId> ShardMap::GetPropertyId(const std::string &property_na
|
|||||||
if (const auto it = properties.find(property_name); it != properties.end()) {
|
if (const auto it = properties.find(property_name); it != properties.end()) {
|
||||||
return it->second;
|
return it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -399,7 +398,6 @@ std::optional<EdgeTypeId> ShardMap::GetEdgeTypeId(const std::string &edge_type)
|
|||||||
if (const auto it = edge_types.find(edge_type); it != edge_types.end()) {
|
if (const auto it = edge_types.find(edge_type); it != edge_types.end()) {
|
||||||
return it->second;
|
return it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -411,6 +409,7 @@ const std::string &ShardMap::GetEdgeTypeName(const EdgeTypeId property) const {
|
|||||||
}
|
}
|
||||||
throw utils::BasicException("EdgeTypeId not found!");
|
throw utils::BasicException("EdgeTypeId not found!");
|
||||||
}
|
}
|
||||||
|
|
||||||
Shards ShardMap::GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key,
|
Shards ShardMap::GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key,
|
||||||
const PrimaryKey &end_key) const {
|
const PrimaryKey &end_key) const {
|
||||||
MG_ASSERT(start_key <= end_key);
|
MG_ASSERT(start_key <= end_key);
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
#include "io/address.hpp"
|
#include "io/address.hpp"
|
||||||
#include "storage/v3/config.hpp"
|
#include "storage/v3/config.hpp"
|
||||||
#include "storage/v3/id_types.hpp"
|
#include "storage/v3/id_types.hpp"
|
||||||
|
#include "storage/v3/name_id_mapper.hpp"
|
||||||
#include "storage/v3/property_value.hpp"
|
#include "storage/v3/property_value.hpp"
|
||||||
#include "storage/v3/schemas.hpp"
|
#include "storage/v3/schemas.hpp"
|
||||||
#include "storage/v3/temporal.hpp"
|
#include "storage/v3/temporal.hpp"
|
||||||
@ -120,9 +121,9 @@ struct ShardMap {
|
|||||||
std::map<PropertyName, PropertyId> properties;
|
std::map<PropertyName, PropertyId> properties;
|
||||||
std::map<EdgeTypeName, EdgeTypeId> edge_types;
|
std::map<EdgeTypeName, EdgeTypeId> edge_types;
|
||||||
uint64_t max_label_id{kNotExistingId};
|
uint64_t max_label_id{kNotExistingId};
|
||||||
std::map<LabelName, LabelId> labels;
|
|
||||||
std::map<LabelId, LabelSpace> label_spaces;
|
std::map<LabelId, LabelSpace> label_spaces;
|
||||||
std::map<LabelId, std::vector<SchemaProperty>> schemas;
|
std::map<LabelId, std::vector<SchemaProperty>> schemas;
|
||||||
|
std::map<LabelName, LabelId> labels;
|
||||||
|
|
||||||
[[nodiscard]] static ShardMap Parse(std::istream &input_stream);
|
[[nodiscard]] static ShardMap Parse(std::istream &input_stream);
|
||||||
friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map);
|
friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map);
|
||||||
|
@ -13,7 +13,7 @@
|
|||||||
#ifndef MG_AST_INCLUDE_PATH
|
#ifndef MG_AST_INCLUDE_PATH
|
||||||
#ifdef MG_CLANG_TIDY_CHECK
|
#ifdef MG_CLANG_TIDY_CHECK
|
||||||
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
|
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
|
||||||
#define MG_AST_INCLUDE_PATH "query/v2/frontend/ast/ast.hpp"
|
#include "query/v2/bindings/bindings.hpp"
|
||||||
#else
|
#else
|
||||||
#error Missing AST include path
|
#error Missing AST include path
|
||||||
#endif
|
#endif
|
||||||
@ -21,8 +21,6 @@
|
|||||||
|
|
||||||
#ifndef MG_INJECTED_NAMESPACE_NAME
|
#ifndef MG_INJECTED_NAMESPACE_NAME
|
||||||
#ifdef MG_CLANG_TIDY_CHECK
|
#ifdef MG_CLANG_TIDY_CHECK
|
||||||
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
|
|
||||||
#define MG_INJECTED_NAMESPACE_NAME memgraph::query::v2
|
|
||||||
#else
|
#else
|
||||||
#error Missing AST namespace
|
#error Missing AST namespace
|
||||||
#endif
|
#endif
|
||||||
|
@ -718,7 +718,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
|
|||||||
TReturnType GetProperty(const TRecordAccessor &record_accessor, PropertyIx prop) {
|
TReturnType GetProperty(const TRecordAccessor &record_accessor, PropertyIx prop) {
|
||||||
auto maybe_prop = record_accessor.GetProperty(prop.name);
|
auto maybe_prop = record_accessor.GetProperty(prop.name);
|
||||||
// Handler non existent property
|
// Handler non existent property
|
||||||
return conv_(maybe_prop);
|
return conv_(maybe_prop, dba_);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class TRecordAccessor, class TTag = Tag,
|
template <class TRecordAccessor, class TTag = Tag,
|
||||||
@ -726,7 +726,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
|
|||||||
TReturnType GetProperty(const TRecordAccessor &record_accessor, const std::string_view name) {
|
TReturnType GetProperty(const TRecordAccessor &record_accessor, const std::string_view name) {
|
||||||
auto maybe_prop = record_accessor.GetProperty(std::string(name));
|
auto maybe_prop = record_accessor.GetProperty(std::string(name));
|
||||||
// Handler non existent property
|
// Handler non existent property
|
||||||
return conv_(maybe_prop);
|
return conv_(maybe_prop, dba_);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class TRecordAccessor, class TTag = Tag,
|
template <class TRecordAccessor, class TTag = Tag,
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
#include "coordinator/shard_map.hpp"
|
#include "coordinator/shard_map.hpp"
|
||||||
#include "query/v2/accessors.hpp"
|
#include "query/v2/accessors.hpp"
|
||||||
#include "query/v2/requests.hpp"
|
#include "query/v2/requests.hpp"
|
||||||
|
#include "query/v2/shard_request_manager.hpp"
|
||||||
#include "storage/v3/edge_accessor.hpp"
|
#include "storage/v3/edge_accessor.hpp"
|
||||||
#include "storage/v3/id_types.hpp"
|
#include "storage/v3/id_types.hpp"
|
||||||
#include "storage/v3/result.hpp"
|
#include "storage/v3/result.hpp"
|
||||||
@ -70,51 +71,52 @@ query::v2::TypedValue ToTypedValue(const Value &value) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(const query::v2::accessors::VertexAccessor &vertex,
|
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(
|
||||||
const coordinator::ShardMap &shard_map,
|
const query::v2::accessors::VertexAccessor &vertex, const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||||
storage::v3::View /*view*/) {
|
storage::v3::View /*view*/) {
|
||||||
auto id = communication::bolt::Id::FromUint(0);
|
auto id = communication::bolt::Id::FromUint(0);
|
||||||
|
|
||||||
auto labels = vertex.Labels();
|
auto labels = vertex.Labels();
|
||||||
std::vector<std::string> new_labels;
|
std::vector<std::string> new_labels;
|
||||||
new_labels.reserve(labels.size());
|
new_labels.reserve(labels.size());
|
||||||
for (const auto &label : labels) {
|
for (const auto &label : labels) {
|
||||||
new_labels.push_back(shard_map.GetLabelName(label.id));
|
new_labels.push_back(shard_request_manager->LabelToName(label.id));
|
||||||
}
|
}
|
||||||
|
|
||||||
auto properties = vertex.Properties();
|
auto properties = vertex.Properties();
|
||||||
std::map<std::string, Value> new_properties;
|
std::map<std::string, Value> new_properties;
|
||||||
for (const auto &[prop, property_value] : properties) {
|
for (const auto &[prop, property_value] : properties) {
|
||||||
new_properties[shard_map.GetPropertyName(prop)] = ToBoltValue(property_value);
|
new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value);
|
||||||
}
|
}
|
||||||
return communication::bolt::Vertex{id, new_labels, new_properties};
|
return communication::bolt::Vertex{id, new_labels, new_properties};
|
||||||
}
|
}
|
||||||
|
|
||||||
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(const query::v2::accessors::EdgeAccessor &edge,
|
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(
|
||||||
const coordinator::ShardMap &shard_map,
|
const query::v2::accessors::EdgeAccessor &edge, const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||||
storage::v3::View /*view*/) {
|
storage::v3::View /*view*/) {
|
||||||
// TODO(jbajic) Fix bolt communication
|
// TODO(jbajic) Fix bolt communication
|
||||||
auto id = communication::bolt::Id::FromUint(0);
|
auto id = communication::bolt::Id::FromUint(0);
|
||||||
auto from = communication::bolt::Id::FromUint(0);
|
auto from = communication::bolt::Id::FromUint(0);
|
||||||
auto to = communication::bolt::Id::FromUint(0);
|
auto to = communication::bolt::Id::FromUint(0);
|
||||||
const auto &type = shard_map.GetEdgeTypeName(edge.EdgeType());
|
const auto &type = shard_request_manager->EdgeTypeToName(edge.EdgeType());
|
||||||
|
|
||||||
auto properties = edge.Properties();
|
auto properties = edge.Properties();
|
||||||
std::map<std::string, Value> new_properties;
|
std::map<std::string, Value> new_properties;
|
||||||
for (const auto &[prop, property_value] : properties) {
|
for (const auto &[prop, property_value] : properties) {
|
||||||
new_properties[shard_map.GetPropertyName(prop)] = ToBoltValue(property_value);
|
new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value);
|
||||||
}
|
}
|
||||||
return communication::bolt::Edge{id, from, to, type, new_properties};
|
return communication::bolt::Edge{id, from, to, type, new_properties};
|
||||||
}
|
}
|
||||||
|
|
||||||
storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::accessors::Path & /*edge*/,
|
storage::v3::Result<communication::bolt::Path> ToBoltPath(
|
||||||
const coordinator::ShardMap & /*shard_map*/,
|
const query::v2::accessors::Path & /*edge*/, const msgs::ShardRequestManagerInterface * /*shard_request_manager*/,
|
||||||
storage::v3::View /*view*/) {
|
storage::v3::View /*view*/) {
|
||||||
// TODO(jbajic) Fix bolt communication
|
// TODO(jbajic) Fix bolt communication
|
||||||
return {storage::v3::Error::DELETED_OBJECT};
|
return {storage::v3::Error::DELETED_OBJECT};
|
||||||
}
|
}
|
||||||
|
|
||||||
storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const coordinator::ShardMap &shard_map,
|
storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value,
|
||||||
|
const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||||
storage::v3::View view) {
|
storage::v3::View view) {
|
||||||
switch (value.type()) {
|
switch (value.type()) {
|
||||||
case query::v2::TypedValue::Type::Null:
|
case query::v2::TypedValue::Type::Null:
|
||||||
@ -131,7 +133,7 @@ storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const
|
|||||||
std::vector<Value> values;
|
std::vector<Value> values;
|
||||||
values.reserve(value.ValueList().size());
|
values.reserve(value.ValueList().size());
|
||||||
for (const auto &v : value.ValueList()) {
|
for (const auto &v : value.ValueList()) {
|
||||||
auto maybe_value = ToBoltValue(v, shard_map, view);
|
auto maybe_value = ToBoltValue(v, shard_request_manager, view);
|
||||||
if (maybe_value.HasError()) return maybe_value.GetError();
|
if (maybe_value.HasError()) return maybe_value.GetError();
|
||||||
values.emplace_back(std::move(*maybe_value));
|
values.emplace_back(std::move(*maybe_value));
|
||||||
}
|
}
|
||||||
@ -140,24 +142,24 @@ storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const
|
|||||||
case query::v2::TypedValue::Type::Map: {
|
case query::v2::TypedValue::Type::Map: {
|
||||||
std::map<std::string, Value> map;
|
std::map<std::string, Value> map;
|
||||||
for (const auto &kv : value.ValueMap()) {
|
for (const auto &kv : value.ValueMap()) {
|
||||||
auto maybe_value = ToBoltValue(kv.second, shard_map, view);
|
auto maybe_value = ToBoltValue(kv.second, shard_request_manager, view);
|
||||||
if (maybe_value.HasError()) return maybe_value.GetError();
|
if (maybe_value.HasError()) return maybe_value.GetError();
|
||||||
map.emplace(kv.first, std::move(*maybe_value));
|
map.emplace(kv.first, std::move(*maybe_value));
|
||||||
}
|
}
|
||||||
return Value(std::move(map));
|
return Value(std::move(map));
|
||||||
}
|
}
|
||||||
case query::v2::TypedValue::Type::Vertex: {
|
case query::v2::TypedValue::Type::Vertex: {
|
||||||
auto maybe_vertex = ToBoltVertex(value.ValueVertex(), shard_map, view);
|
auto maybe_vertex = ToBoltVertex(value.ValueVertex(), shard_request_manager, view);
|
||||||
if (maybe_vertex.HasError()) return maybe_vertex.GetError();
|
if (maybe_vertex.HasError()) return maybe_vertex.GetError();
|
||||||
return Value(std::move(*maybe_vertex));
|
return Value(std::move(*maybe_vertex));
|
||||||
}
|
}
|
||||||
case query::v2::TypedValue::Type::Edge: {
|
case query::v2::TypedValue::Type::Edge: {
|
||||||
auto maybe_edge = ToBoltEdge(value.ValueEdge(), shard_map, view);
|
auto maybe_edge = ToBoltEdge(value.ValueEdge(), shard_request_manager, view);
|
||||||
if (maybe_edge.HasError()) return maybe_edge.GetError();
|
if (maybe_edge.HasError()) return maybe_edge.GetError();
|
||||||
return Value(std::move(*maybe_edge));
|
return Value(std::move(*maybe_edge));
|
||||||
}
|
}
|
||||||
case query::v2::TypedValue::Type::Path: {
|
case query::v2::TypedValue::Type::Path: {
|
||||||
auto maybe_path = ToBoltPath(value.ValuePath(), shard_map, view);
|
auto maybe_path = ToBoltPath(value.ValuePath(), shard_request_manager, view);
|
||||||
if (maybe_path.HasError()) return maybe_path.GetError();
|
if (maybe_path.HasError()) return maybe_path.GetError();
|
||||||
return Value(std::move(*maybe_path));
|
return Value(std::move(*maybe_path));
|
||||||
}
|
}
|
||||||
@ -209,12 +211,6 @@ Value ToBoltValue(msgs::Value value) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::accessors::Path & /*path*/,
|
|
||||||
const storage::v3::Shard & /*db*/,
|
|
||||||
storage::v3::View /*view*/) {
|
|
||||||
return communication::bolt::Path();
|
|
||||||
}
|
|
||||||
|
|
||||||
storage::v3::PropertyValue ToPropertyValue(const Value &value) {
|
storage::v3::PropertyValue ToPropertyValue(const Value &value) {
|
||||||
switch (value.type()) {
|
switch (value.type()) {
|
||||||
case Value::Type::Null:
|
case Value::Type::Null:
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
#include "communication/bolt/v1/value.hpp"
|
#include "communication/bolt/v1/value.hpp"
|
||||||
#include "coordinator/shard_map.hpp"
|
#include "coordinator/shard_map.hpp"
|
||||||
#include "query/v2/bindings/typed_value.hpp"
|
#include "query/v2/bindings/typed_value.hpp"
|
||||||
|
#include "query/v2/shard_request_manager.hpp"
|
||||||
#include "storage/v3/property_value.hpp"
|
#include "storage/v3/property_value.hpp"
|
||||||
#include "storage/v3/result.hpp"
|
#include "storage/v3/result.hpp"
|
||||||
#include "storage/v3/shard.hpp"
|
#include "storage/v3/shard.hpp"
|
||||||
@ -30,40 +31,40 @@ namespace memgraph::glue::v2 {
|
|||||||
|
|
||||||
/// @param storage::v3::VertexAccessor for converting to
|
/// @param storage::v3::VertexAccessor for converting to
|
||||||
/// communication::bolt::Vertex.
|
/// communication::bolt::Vertex.
|
||||||
/// @param coordinator::ShardMap shard_map getting label and property names.
|
/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting label and property names.
|
||||||
/// @param storage::v3::View for deciding which vertex attributes are visible.
|
/// @param storage::v3::View for deciding which vertex attributes are visible.
|
||||||
///
|
///
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(const storage::v3::VertexAccessor &vertex,
|
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(
|
||||||
const coordinator::ShardMap &shard_map,
|
const storage::v3::VertexAccessor &vertex, const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||||
storage::v3::View view);
|
storage::v3::View view);
|
||||||
|
|
||||||
/// @param storage::v3::EdgeAccessor for converting to communication::bolt::Edge.
|
/// @param storage::v3::EdgeAccessor for converting to communication::bolt::Edge.
|
||||||
/// @param coordinator::ShardMap shard_map getting edge type and property names.
|
/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting edge type and property names.
|
||||||
/// @param storage::v3::View for deciding which edge attributes are visible.
|
/// @param storage::v3::View for deciding which edge attributes are visible.
|
||||||
///
|
///
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(const storage::v3::EdgeAccessor &edge,
|
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(
|
||||||
const coordinator::ShardMap &shard_map,
|
const storage::v3::EdgeAccessor &edge, const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||||
storage::v3::View view);
|
storage::v3::View view);
|
||||||
|
|
||||||
/// @param query::v2::Path for converting to communication::bolt::Path.
|
/// @param query::v2::Path for converting to communication::bolt::Path.
|
||||||
/// @param coordinator::ShardMap shard_map ToBoltVertex and ToBoltEdge.
|
/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge.
|
||||||
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
|
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
|
||||||
///
|
///
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::accessors::Path &path,
|
storage::v3::Result<communication::bolt::Path> ToBoltPath(
|
||||||
const coordinator::ShardMap &shard_map,
|
const query::v2::accessors::Path &path, const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||||
storage::v3::View view);
|
storage::v3::View view);
|
||||||
|
|
||||||
/// @param query::v2::TypedValue for converting to communication::bolt::Value.
|
/// @param query::v2::TypedValue for converting to communication::bolt::Value.
|
||||||
/// @param coordinator::ShardMap shard_map ToBoltVertex and ToBoltEdge.
|
/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge.
|
||||||
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
|
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
|
||||||
///
|
///
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
storage::v3::Result<communication::bolt::Value> ToBoltValue(const query::v2::TypedValue &value,
|
storage::v3::Result<communication::bolt::Value> ToBoltValue(
|
||||||
const coordinator::ShardMap &shard_map,
|
const query::v2::TypedValue &value, const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||||
storage::v3::View view);
|
storage::v3::View view);
|
||||||
|
|
||||||
query::v2::TypedValue ToTypedValue(const communication::bolt::Value &value);
|
query::v2::TypedValue ToTypedValue(const communication::bolt::Value &value);
|
||||||
|
|
||||||
@ -73,7 +74,8 @@ storage::v3::PropertyValue ToPropertyValue(const communication::bolt::Value &val
|
|||||||
|
|
||||||
communication::bolt::Value ToBoltValue(msgs::Value value);
|
communication::bolt::Value ToBoltValue(msgs::Value value);
|
||||||
|
|
||||||
communication::bolt::Value ToBoltValue(msgs::Value value, const coordinator::ShardMap &shard_map,
|
communication::bolt::Value ToBoltValue(msgs::Value value,
|
||||||
|
const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||||
storage::v3::View view);
|
storage::v3::View view);
|
||||||
|
|
||||||
} // namespace memgraph::glue::v2
|
} // namespace memgraph::glue::v2
|
||||||
|
@ -407,9 +407,8 @@ DEFINE_string(organization_name, "", "Organization name.");
|
|||||||
struct SessionData {
|
struct SessionData {
|
||||||
// Explicit constructor here to ensure that pointers to all objects are
|
// Explicit constructor here to ensure that pointers to all objects are
|
||||||
// supplied.
|
// supplied.
|
||||||
SessionData(memgraph::coordinator::ShardMap &shard_map, memgraph::query::v2::InterpreterContext *interpreter_context)
|
explicit SessionData(memgraph::query::v2::InterpreterContext *interpreter_context)
|
||||||
: shard_map(&shard_map), interpreter_context(interpreter_context) {}
|
: interpreter_context(interpreter_context) {}
|
||||||
memgraph::coordinator::ShardMap *shard_map;
|
|
||||||
memgraph::query::v2::InterpreterContext *interpreter_context;
|
memgraph::query::v2::InterpreterContext *interpreter_context;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -424,7 +423,6 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
|
|||||||
memgraph::communication::v2::OutputStream *output_stream)
|
memgraph::communication::v2::OutputStream *output_stream)
|
||||||
: memgraph::communication::bolt::Session<memgraph::communication::v2::InputStream,
|
: memgraph::communication::bolt::Session<memgraph::communication::v2::InputStream,
|
||||||
memgraph::communication::v2::OutputStream>(input_stream, output_stream),
|
memgraph::communication::v2::OutputStream>(input_stream, output_stream),
|
||||||
shard_map_(data.shard_map),
|
|
||||||
interpreter_(data.interpreter_context),
|
interpreter_(data.interpreter_context),
|
||||||
endpoint_(endpoint) {}
|
endpoint_(endpoint) {}
|
||||||
|
|
||||||
@ -455,7 +453,7 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
|
|||||||
|
|
||||||
std::map<std::string, memgraph::communication::bolt::Value> Pull(TEncoder *encoder, std::optional<int> n,
|
std::map<std::string, memgraph::communication::bolt::Value> Pull(TEncoder *encoder, std::optional<int> n,
|
||||||
std::optional<int> qid) override {
|
std::optional<int> qid) override {
|
||||||
TypedValueResultStream stream(encoder, *shard_map_);
|
TypedValueResultStream stream(encoder, interpreter_.GetShardRequestManager());
|
||||||
return PullResults(stream, n, qid);
|
return PullResults(stream, n, qid);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -482,7 +480,8 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
|
|||||||
const auto &summary = interpreter_.Pull(&stream, n, qid);
|
const auto &summary = interpreter_.Pull(&stream, n, qid);
|
||||||
std::map<std::string, memgraph::communication::bolt::Value> decoded_summary;
|
std::map<std::string, memgraph::communication::bolt::Value> decoded_summary;
|
||||||
for (const auto &kv : summary) {
|
for (const auto &kv : summary) {
|
||||||
auto maybe_value = memgraph::glue::v2::ToBoltValue(kv.second, *shard_map_, memgraph::storage::v3::View::NEW);
|
auto maybe_value = memgraph::glue::v2::ToBoltValue(kv.second, interpreter_.GetShardRequestManager(),
|
||||||
|
memgraph::storage::v3::View::NEW);
|
||||||
if (maybe_value.HasError()) {
|
if (maybe_value.HasError()) {
|
||||||
switch (maybe_value.GetError()) {
|
switch (maybe_value.GetError()) {
|
||||||
case memgraph::storage::v3::Error::DELETED_OBJECT:
|
case memgraph::storage::v3::Error::DELETED_OBJECT:
|
||||||
@ -507,14 +506,14 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
|
|||||||
/// before forwarding the calls to original TEncoder.
|
/// before forwarding the calls to original TEncoder.
|
||||||
class TypedValueResultStream {
|
class TypedValueResultStream {
|
||||||
public:
|
public:
|
||||||
TypedValueResultStream(TEncoder *encoder, const memgraph::coordinator::ShardMap &shard_map)
|
TypedValueResultStream(TEncoder *encoder, const memgraph::msgs::ShardRequestManagerInterface *shard_request_manager)
|
||||||
: encoder_(encoder), shard_map_(&shard_map) {}
|
: encoder_(encoder), shard_request_manager_(shard_request_manager) {}
|
||||||
|
|
||||||
void Result(const std::vector<memgraph::query::v2::TypedValue> &values) {
|
void Result(const std::vector<memgraph::query::v2::TypedValue> &values) {
|
||||||
std::vector<memgraph::communication::bolt::Value> decoded_values;
|
std::vector<memgraph::communication::bolt::Value> decoded_values;
|
||||||
decoded_values.reserve(values.size());
|
decoded_values.reserve(values.size());
|
||||||
for (const auto &v : values) {
|
for (const auto &v : values) {
|
||||||
auto maybe_value = memgraph::glue::v2::ToBoltValue(v, *shard_map_, memgraph::storage::v3::View::NEW);
|
auto maybe_value = memgraph::glue::v2::ToBoltValue(v, shard_request_manager_, memgraph::storage::v3::View::NEW);
|
||||||
if (maybe_value.HasError()) {
|
if (maybe_value.HasError()) {
|
||||||
switch (maybe_value.GetError()) {
|
switch (maybe_value.GetError()) {
|
||||||
case memgraph::storage::v3::Error::DELETED_OBJECT:
|
case memgraph::storage::v3::Error::DELETED_OBJECT:
|
||||||
@ -534,12 +533,8 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
TEncoder *encoder_;
|
TEncoder *encoder_;
|
||||||
// NOTE: Needed only for ToBoltValue conversions
|
const memgraph::msgs::ShardRequestManagerInterface *shard_request_manager_{nullptr};
|
||||||
const memgraph::coordinator::ShardMap *shard_map_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// NOTE: Needed only for ToBoltValue conversions
|
|
||||||
const memgraph::coordinator::ShardMap *shard_map_;
|
|
||||||
memgraph::query::v2::Interpreter interpreter_;
|
memgraph::query::v2::Interpreter interpreter_;
|
||||||
memgraph::communication::v2::ServerEndpoint endpoint_;
|
memgraph::communication::v2::ServerEndpoint endpoint_;
|
||||||
};
|
};
|
||||||
@ -680,7 +675,7 @@ int main(int argc, char **argv) {
|
|||||||
std::move(io),
|
std::move(io),
|
||||||
mm.CoordinatorAddress()};
|
mm.CoordinatorAddress()};
|
||||||
|
|
||||||
SessionData session_data{sm, &interpreter_context};
|
SessionData session_data{&interpreter_context};
|
||||||
|
|
||||||
interpreter_context.auth = nullptr;
|
interpreter_context.auth = nullptr;
|
||||||
interpreter_context.auth_checker = nullptr;
|
interpreter_context.auth_checker = nullptr;
|
||||||
|
@ -11,38 +11,59 @@
|
|||||||
|
|
||||||
#include "query/v2/accessors.hpp"
|
#include "query/v2/accessors.hpp"
|
||||||
#include "query/v2/requests.hpp"
|
#include "query/v2/requests.hpp"
|
||||||
|
#include "query/v2/shard_request_manager.hpp"
|
||||||
#include "storage/v3/id_types.hpp"
|
#include "storage/v3/id_types.hpp"
|
||||||
|
|
||||||
namespace memgraph::query::v2::accessors {
|
namespace memgraph::query::v2::accessors {
|
||||||
EdgeAccessor::EdgeAccessor(Edge edge) : edge(std::move(edge)) {}
|
EdgeAccessor::EdgeAccessor(Edge edge, const msgs::ShardRequestManagerInterface *manager)
|
||||||
|
: edge(std::move(edge)), manager_(manager) {}
|
||||||
|
|
||||||
EdgeTypeId EdgeAccessor::EdgeType() const { return edge.type.id; }
|
EdgeTypeId EdgeAccessor::EdgeType() const { return edge.type.id; }
|
||||||
|
|
||||||
const std::vector<std::pair<PropertyId, Value>> &EdgeAccessor::Properties() const {
|
const std::vector<std::pair<PropertyId, Value>> &EdgeAccessor::Properties() const { return edge.properties; }
|
||||||
return edge.properties;
|
|
||||||
// std::map<std::string, TypedValue> res;
|
|
||||||
// for (const auto &[name, value] : *properties) {
|
|
||||||
// res[name] = ValueToTypedValue(value);
|
|
||||||
// }
|
|
||||||
// return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
|
Value EdgeAccessor::GetProperty(const std::string &prop_name) const {
|
||||||
Value EdgeAccessor::GetProperty(const std::string & /*prop_name*/) const {
|
auto prop_id = manager_->NameToProperty(prop_name);
|
||||||
// TODO(kostasrim) fix this
|
auto it = std::find_if(edge.properties.begin(), edge.properties.end(), [&](auto &pr) { return prop_id == pr.first; });
|
||||||
return {};
|
if (it == edge.properties.end()) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
return it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
const Edge &EdgeAccessor::GetEdge() const { return edge; }
|
const Edge &EdgeAccessor::GetEdge() const { return edge; }
|
||||||
|
|
||||||
bool EdgeAccessor::IsCycle() const { return edge.src == edge.dst; };
|
bool EdgeAccessor::IsCycle() const { return edge.src == edge.dst; };
|
||||||
|
|
||||||
VertexAccessor EdgeAccessor::To() const { return VertexAccessor(Vertex{edge.dst}, {}); }
|
VertexAccessor EdgeAccessor::To() const {
|
||||||
|
return VertexAccessor(Vertex{edge.dst}, std::vector<std::pair<PropertyId, msgs::Value>>{}, manager_);
|
||||||
|
}
|
||||||
|
|
||||||
VertexAccessor EdgeAccessor::From() const { return VertexAccessor(Vertex{edge.src}, {}); }
|
VertexAccessor EdgeAccessor::From() const {
|
||||||
|
return VertexAccessor(Vertex{edge.src}, std::vector<std::pair<PropertyId, msgs::Value>>{}, manager_);
|
||||||
|
}
|
||||||
|
|
||||||
VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props)
|
VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props,
|
||||||
: vertex(std::move(v)), properties(std::move(props)) {}
|
const msgs::ShardRequestManagerInterface *manager)
|
||||||
|
: vertex(std::move(v)), properties(std::move(props)), manager_(manager) {}
|
||||||
|
|
||||||
|
VertexAccessor::VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props,
|
||||||
|
const msgs::ShardRequestManagerInterface *manager)
|
||||||
|
: vertex(std::move(v)), manager_(manager) {
|
||||||
|
properties.reserve(props.size());
|
||||||
|
for (auto &[id, value] : props) {
|
||||||
|
properties.emplace_back(std::make_pair(id, std::move(value)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
VertexAccessor::VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props,
|
||||||
|
const msgs::ShardRequestManagerInterface *manager)
|
||||||
|
: vertex(std::move(v)), manager_(manager) {
|
||||||
|
properties.reserve(props.size());
|
||||||
|
for (const auto &[id, value] : props) {
|
||||||
|
properties.emplace_back(std::make_pair(id, value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Label VertexAccessor::PrimaryLabel() const { return vertex.id.first; }
|
Label VertexAccessor::PrimaryLabel() const { return vertex.id.first; }
|
||||||
|
|
||||||
@ -58,15 +79,16 @@ bool VertexAccessor::HasLabel(Label &label) const {
|
|||||||
const std::vector<std::pair<PropertyId, Value>> &VertexAccessor::Properties() const { return properties; }
|
const std::vector<std::pair<PropertyId, Value>> &VertexAccessor::Properties() const { return properties; }
|
||||||
|
|
||||||
Value VertexAccessor::GetProperty(PropertyId prop_id) const {
|
Value VertexAccessor::GetProperty(PropertyId prop_id) const {
|
||||||
return std::find_if(properties.begin(), properties.end(), [&](auto &pr) { return prop_id == pr.first; })->second;
|
auto it = std::find_if(properties.begin(), properties.end(), [&](auto &pr) { return prop_id == pr.first; });
|
||||||
// return ValueToTypedValue(properties[prop_name]);
|
if (it == properties.end()) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
return it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
|
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
|
||||||
Value VertexAccessor::GetProperty(const std::string & /*prop_name*/) const {
|
Value VertexAccessor::GetProperty(const std::string &prop_name) const {
|
||||||
// TODO(kostasrim) Add string mapping
|
return GetProperty(manager_->NameToProperty(prop_name));
|
||||||
return {};
|
|
||||||
// return ValueToTypedValue(properties[prop_name]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs::Vertex VertexAccessor::GetVertex() const { return vertex; }
|
msgs::Vertex VertexAccessor::GetVertex() const { return vertex; }
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <map>
|
||||||
#include <optional>
|
#include <optional>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
@ -23,6 +24,10 @@
|
|||||||
#include "utils/memory.hpp"
|
#include "utils/memory.hpp"
|
||||||
#include "utils/memory_tracker.hpp"
|
#include "utils/memory_tracker.hpp"
|
||||||
|
|
||||||
|
namespace memgraph::msgs {
|
||||||
|
class ShardRequestManagerInterface;
|
||||||
|
} // namespace memgraph::msgs
|
||||||
|
|
||||||
namespace memgraph::query::v2::accessors {
|
namespace memgraph::query::v2::accessors {
|
||||||
|
|
||||||
using Value = memgraph::msgs::Value;
|
using Value = memgraph::msgs::Value;
|
||||||
@ -36,7 +41,7 @@ class VertexAccessor;
|
|||||||
|
|
||||||
class EdgeAccessor final {
|
class EdgeAccessor final {
|
||||||
public:
|
public:
|
||||||
explicit EdgeAccessor(Edge edge);
|
explicit EdgeAccessor(Edge edge, const msgs::ShardRequestManagerInterface *manager);
|
||||||
|
|
||||||
[[nodiscard]] EdgeTypeId EdgeType() const;
|
[[nodiscard]] EdgeTypeId EdgeType() const;
|
||||||
|
|
||||||
@ -64,6 +69,7 @@ class EdgeAccessor final {
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
Edge edge;
|
Edge edge;
|
||||||
|
const msgs::ShardRequestManagerInterface *manager_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class VertexAccessor final {
|
class VertexAccessor final {
|
||||||
@ -71,7 +77,11 @@ class VertexAccessor final {
|
|||||||
using PropertyId = msgs::PropertyId;
|
using PropertyId = msgs::PropertyId;
|
||||||
using Label = msgs::Label;
|
using Label = msgs::Label;
|
||||||
using VertexId = msgs::VertexId;
|
using VertexId = msgs::VertexId;
|
||||||
VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props);
|
VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props,
|
||||||
|
const msgs::ShardRequestManagerInterface *manager);
|
||||||
|
|
||||||
|
VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props, const msgs::ShardRequestManagerInterface *manager);
|
||||||
|
VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props, const msgs::ShardRequestManagerInterface *manager);
|
||||||
|
|
||||||
[[nodiscard]] Label PrimaryLabel() const;
|
[[nodiscard]] Label PrimaryLabel() const;
|
||||||
|
|
||||||
@ -140,6 +150,7 @@ class VertexAccessor final {
|
|||||||
private:
|
private:
|
||||||
Vertex vertex;
|
Vertex vertex;
|
||||||
std::vector<std::pair<PropertyId, Value>> properties;
|
std::vector<std::pair<PropertyId, Value>> properties;
|
||||||
|
const msgs::ShardRequestManagerInterface *manager_;
|
||||||
};
|
};
|
||||||
|
|
||||||
// inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); }
|
// inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); }
|
||||||
|
@ -23,6 +23,10 @@
|
|||||||
#include "storage/v3/property_value.hpp"
|
#include "storage/v3/property_value.hpp"
|
||||||
#include "storage/v3/view.hpp"
|
#include "storage/v3/view.hpp"
|
||||||
|
|
||||||
|
namespace memgraph::msgs {
|
||||||
|
class ShardRequestManagerInterface;
|
||||||
|
} // namespace memgraph::msgs
|
||||||
|
|
||||||
namespace memgraph::query::v2 {
|
namespace memgraph::query::v2 {
|
||||||
|
|
||||||
inline const auto lam = [](const auto &val) { return ValueToTypedValue(val); };
|
inline const auto lam = [](const auto &val) { return ValueToTypedValue(val); };
|
||||||
@ -32,13 +36,14 @@ class Callable {
|
|||||||
auto operator()(const memgraph::storage::v3::PropertyValue &val) const {
|
auto operator()(const memgraph::storage::v3::PropertyValue &val) const {
|
||||||
return memgraph::storage::v3::PropertyToTypedValue<TypedValue>(val);
|
return memgraph::storage::v3::PropertyToTypedValue<TypedValue>(val);
|
||||||
};
|
};
|
||||||
auto operator()(const msgs::Value &val) const { return ValueToTypedValue(val); };
|
auto operator()(const msgs::Value &val, memgraph::msgs::ShardRequestManagerInterface *manager) const {
|
||||||
|
return ValueToTypedValue(val, manager);
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace detail
|
} // namespace detail
|
||||||
using ExpressionEvaluator =
|
using ExpressionEvaluator = memgraph::expr::ExpressionEvaluator<
|
||||||
memgraph::expr::ExpressionEvaluator<TypedValue, memgraph::query::v2::EvaluationContext, DbAccessor,
|
TypedValue, memgraph::query::v2::EvaluationContext, memgraph::msgs::ShardRequestManagerInterface, storage::v3::View,
|
||||||
storage::v3::View, storage::v3::LabelId, msgs::Value, detail::Callable,
|
storage::v3::LabelId, msgs::Value, detail::Callable, memgraph::storage::v3::Error, memgraph::expr::QueryEngineTag>;
|
||||||
memgraph::storage::v3::Error, memgraph::expr::QueryEngineTag>;
|
|
||||||
|
|
||||||
} // namespace memgraph::query::v2
|
} // namespace memgraph::query::v2
|
||||||
|
@ -13,10 +13,11 @@
|
|||||||
#include "bindings/typed_value.hpp"
|
#include "bindings/typed_value.hpp"
|
||||||
#include "query/v2/accessors.hpp"
|
#include "query/v2/accessors.hpp"
|
||||||
#include "query/v2/requests.hpp"
|
#include "query/v2/requests.hpp"
|
||||||
|
#include "query/v2/shard_request_manager.hpp"
|
||||||
|
|
||||||
namespace memgraph::query::v2 {
|
namespace memgraph::query::v2 {
|
||||||
|
|
||||||
inline TypedValue ValueToTypedValue(const msgs::Value &value) {
|
inline TypedValue ValueToTypedValue(const msgs::Value &value, msgs::ShardRequestManagerInterface *manager) {
|
||||||
using Value = msgs::Value;
|
using Value = msgs::Value;
|
||||||
switch (value.type) {
|
switch (value.type) {
|
||||||
case Value::Type::Null:
|
case Value::Type::Null:
|
||||||
@ -34,7 +35,7 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value) {
|
|||||||
std::vector<TypedValue> dst;
|
std::vector<TypedValue> dst;
|
||||||
dst.reserve(lst.size());
|
dst.reserve(lst.size());
|
||||||
for (const auto &elem : lst) {
|
for (const auto &elem : lst) {
|
||||||
dst.push_back(ValueToTypedValue(elem));
|
dst.push_back(ValueToTypedValue(elem, manager));
|
||||||
}
|
}
|
||||||
return TypedValue(std::move(dst));
|
return TypedValue(std::move(dst));
|
||||||
}
|
}
|
||||||
@ -42,14 +43,15 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value) {
|
|||||||
const auto &value_map = value.map_v;
|
const auto &value_map = value.map_v;
|
||||||
std::map<std::string, TypedValue> dst;
|
std::map<std::string, TypedValue> dst;
|
||||||
for (const auto &[key, val] : value_map) {
|
for (const auto &[key, val] : value_map) {
|
||||||
dst[key] = ValueToTypedValue(val);
|
dst[key] = ValueToTypedValue(val, manager);
|
||||||
}
|
}
|
||||||
return TypedValue(std::move(dst));
|
return TypedValue(std::move(dst));
|
||||||
}
|
}
|
||||||
case Value::Type::Vertex:
|
case Value::Type::Vertex:
|
||||||
return TypedValue(accessors::VertexAccessor(value.vertex_v, {}));
|
return TypedValue(accessors::VertexAccessor(
|
||||||
|
value.vertex_v, std::vector<std::pair<storage::v3::PropertyId, msgs::Value>>{}, manager));
|
||||||
case Value::Type::Edge:
|
case Value::Type::Edge:
|
||||||
return TypedValue(accessors::EdgeAccessor(value.edge_v));
|
return TypedValue(accessors::EdgeAccessor(value.edge_v, manager));
|
||||||
}
|
}
|
||||||
throw std::runtime_error("Incorrect type in conversion");
|
throw std::runtime_error("Incorrect type in conversion");
|
||||||
}
|
}
|
||||||
|
@ -22,8 +22,8 @@
|
|||||||
|
|
||||||
#include "query/v2/bindings/typed_value.hpp"
|
#include "query/v2/bindings/typed_value.hpp"
|
||||||
#include "query/v2/conversions.hpp"
|
#include "query/v2/conversions.hpp"
|
||||||
#include "query/v2/db_accessor.hpp"
|
|
||||||
#include "query/v2/exceptions.hpp"
|
#include "query/v2/exceptions.hpp"
|
||||||
|
#include "query/v2/shard_request_manager.hpp"
|
||||||
#include "storage/v3/conversions.hpp"
|
#include "storage/v3/conversions.hpp"
|
||||||
#include "utils/string.hpp"
|
#include "utils/string.hpp"
|
||||||
#include "utils/temporal.hpp"
|
#include "utils/temporal.hpp"
|
||||||
|
@ -16,12 +16,15 @@
|
|||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
#include "query/v2/bindings/typed_value.hpp"
|
#include "query/v2/bindings/typed_value.hpp"
|
||||||
|
#include "query/v2/db_accessor.hpp"
|
||||||
#include "storage/v3/view.hpp"
|
#include "storage/v3/view.hpp"
|
||||||
#include "utils/memory.hpp"
|
#include "utils/memory.hpp"
|
||||||
|
|
||||||
namespace memgraph::query::v2 {
|
namespace memgraph::msgs {
|
||||||
|
class ShardRequestManagerInterface;
|
||||||
|
} // namespace memgraph::msgs
|
||||||
|
|
||||||
class DbAccessor;
|
namespace memgraph::query::v2 {
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
const char kStartsWith[] = "STARTSWITH";
|
const char kStartsWith[] = "STARTSWITH";
|
||||||
@ -31,7 +34,9 @@ const char kId[] = "ID";
|
|||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
struct FunctionContext {
|
struct FunctionContext {
|
||||||
DbAccessor *db_accessor;
|
// TODO(kostasrim) consider optional here. ShardRequestManager does not exist on the storage.
|
||||||
|
// DbAccessor *db_accessor;
|
||||||
|
msgs::ShardRequestManagerInterface *manager;
|
||||||
utils::MemoryResource *memory;
|
utils::MemoryResource *memory;
|
||||||
int64_t timestamp;
|
int64_t timestamp;
|
||||||
std::unordered_map<std::string, int64_t> *counters;
|
std::unordered_map<std::string, int64_t> *counters;
|
||||||
|
@ -144,7 +144,7 @@ class ReplQueryHandler final : public query::v2::ReplicationQueryHandler {
|
|||||||
/// @throw QueryRuntimeException if an error ocurred.
|
/// @throw QueryRuntimeException if an error ocurred.
|
||||||
|
|
||||||
Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters ¶meters,
|
Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters ¶meters,
|
||||||
DbAccessor *db_accessor) {
|
msgs::ShardRequestManagerInterface *manager) {
|
||||||
// Empty frame for evaluation of password expression. This is OK since
|
// Empty frame for evaluation of password expression. This is OK since
|
||||||
// password should be either null or string literal and it's evaluation
|
// password should be either null or string literal and it's evaluation
|
||||||
// should not depend on frame.
|
// should not depend on frame.
|
||||||
@ -155,7 +155,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa
|
|||||||
// the argument to Callback.
|
// the argument to Callback.
|
||||||
evaluation_context.timestamp = QueryTimestamp();
|
evaluation_context.timestamp = QueryTimestamp();
|
||||||
evaluation_context.parameters = parameters;
|
evaluation_context.parameters = parameters;
|
||||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
|
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD);
|
||||||
|
|
||||||
std::string username = auth_query->user_;
|
std::string username = auth_query->user_;
|
||||||
std::string rolename = auth_query->role_;
|
std::string rolename = auth_query->role_;
|
||||||
@ -313,7 +313,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa
|
|||||||
}
|
}
|
||||||
|
|
||||||
Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters ¶meters,
|
Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters ¶meters,
|
||||||
InterpreterContext *interpreter_context, DbAccessor *db_accessor,
|
InterpreterContext *interpreter_context, msgs::ShardRequestManagerInterface *manager,
|
||||||
std::vector<Notification> *notifications) {
|
std::vector<Notification> *notifications) {
|
||||||
expr::Frame<TypedValue> frame(0);
|
expr::Frame<TypedValue> frame(0);
|
||||||
SymbolTable symbol_table;
|
SymbolTable symbol_table;
|
||||||
@ -322,7 +322,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
|
|||||||
// the argument to Callback.
|
// the argument to Callback.
|
||||||
evaluation_context.timestamp = QueryTimestamp();
|
evaluation_context.timestamp = QueryTimestamp();
|
||||||
evaluation_context.parameters = parameters;
|
evaluation_context.parameters = parameters;
|
||||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
|
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD);
|
||||||
|
|
||||||
Callback callback;
|
Callback callback;
|
||||||
switch (repl_query->action_) {
|
switch (repl_query->action_) {
|
||||||
@ -448,7 +448,8 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶meters, DbAccessor *db_accessor) {
|
Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶meters,
|
||||||
|
msgs::ShardRequestManagerInterface *manager) {
|
||||||
expr::Frame<TypedValue> frame(0);
|
expr::Frame<TypedValue> frame(0);
|
||||||
SymbolTable symbol_table;
|
SymbolTable symbol_table;
|
||||||
EvaluationContext evaluation_context;
|
EvaluationContext evaluation_context;
|
||||||
@ -458,7 +459,7 @@ Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶m
|
|||||||
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
|
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
|
||||||
.count();
|
.count();
|
||||||
evaluation_context.parameters = parameters;
|
evaluation_context.parameters = parameters;
|
||||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
|
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD);
|
||||||
|
|
||||||
Callback callback;
|
Callback callback;
|
||||||
switch (setting_query->action_) {
|
switch (setting_query->action_) {
|
||||||
@ -886,7 +887,8 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
|
|||||||
EvaluationContext evaluation_context;
|
EvaluationContext evaluation_context;
|
||||||
evaluation_context.timestamp = QueryTimestamp();
|
evaluation_context.timestamp = QueryTimestamp();
|
||||||
evaluation_context.parameters = parsed_query.parameters;
|
evaluation_context.parameters = parsed_query.parameters;
|
||||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::v3::View::OLD);
|
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, shard_request_manager,
|
||||||
|
storage::v3::View::OLD);
|
||||||
const auto memory_limit =
|
const auto memory_limit =
|
||||||
expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
|
expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
|
||||||
if (memory_limit) {
|
if (memory_limit) {
|
||||||
@ -901,7 +903,6 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
|
|||||||
"convert the parsed row values to the appropriate type. This can be done using the built-in "
|
"convert the parsed row values to the appropriate type. This can be done using the built-in "
|
||||||
"conversion functions such as ToInteger, ToFloat, ToBoolean etc.");
|
"conversion functions such as ToInteger, ToFloat, ToBoolean etc.");
|
||||||
}
|
}
|
||||||
shard_request_manager->StartTransaction();
|
|
||||||
auto plan = CypherQueryToPlan(
|
auto plan = CypherQueryToPlan(
|
||||||
parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query, parsed_query.parameters,
|
parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query, parsed_query.parameters,
|
||||||
parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, shard_request_manager);
|
parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, shard_request_manager);
|
||||||
@ -957,10 +958,10 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map<std::string
|
|||||||
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query);
|
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query);
|
||||||
MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in EXPLAIN");
|
MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in EXPLAIN");
|
||||||
|
|
||||||
auto cypher_query_plan =
|
auto cypher_query_plan = CypherQueryToPlan(
|
||||||
CypherQueryToPlan(parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage),
|
parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), cypher_query,
|
||||||
cypher_query, parsed_inner_query.parameters,
|
parsed_inner_query.parameters, parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr,
|
||||||
parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, nullptr);
|
shard_request_manager);
|
||||||
|
|
||||||
std::stringstream printed_plan;
|
std::stringstream printed_plan;
|
||||||
plan::PrettyPrint(*shard_request_manager, &cypher_query_plan->plan(), &printed_plan);
|
plan::PrettyPrint(*shard_request_manager, &cypher_query_plan->plan(), &printed_plan);
|
||||||
@ -1030,7 +1031,8 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
|
|||||||
EvaluationContext evaluation_context;
|
EvaluationContext evaluation_context;
|
||||||
evaluation_context.timestamp = QueryTimestamp();
|
evaluation_context.timestamp = QueryTimestamp();
|
||||||
evaluation_context.parameters = parsed_inner_query.parameters;
|
evaluation_context.parameters = parsed_inner_query.parameters;
|
||||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::v3::View::OLD);
|
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, shard_request_manager,
|
||||||
|
storage::v3::View::OLD);
|
||||||
const auto memory_limit =
|
const auto memory_limit =
|
||||||
expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
|
expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
|
||||||
|
|
||||||
@ -1179,14 +1181,15 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
|
|||||||
|
|
||||||
PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
|
PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
|
||||||
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
|
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
|
||||||
DbAccessor *dba, utils::MemoryResource *execution_memory) {
|
DbAccessor *dba, utils::MemoryResource *execution_memory,
|
||||||
|
msgs::ShardRequestManagerInterface *manager) {
|
||||||
if (in_explicit_transaction) {
|
if (in_explicit_transaction) {
|
||||||
throw UserModificationInMulticommandTxException();
|
throw UserModificationInMulticommandTxException();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto *auth_query = utils::Downcast<AuthQuery>(parsed_query.query);
|
auto *auth_query = utils::Downcast<AuthQuery>(parsed_query.query);
|
||||||
|
|
||||||
auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, dba);
|
auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, manager);
|
||||||
|
|
||||||
SymbolTable symbol_table;
|
SymbolTable symbol_table;
|
||||||
std::vector<Symbol> output_symbols;
|
std::vector<Symbol> output_symbols;
|
||||||
@ -1215,14 +1218,14 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa
|
|||||||
|
|
||||||
PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
|
PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
|
||||||
std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
|
std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
|
||||||
DbAccessor *dba) {
|
msgs::ShardRequestManagerInterface *manager) {
|
||||||
if (in_explicit_transaction) {
|
if (in_explicit_transaction) {
|
||||||
throw ReplicationModificationInMulticommandTxException();
|
throw ReplicationModificationInMulticommandTxException();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto *replication_query = utils::Downcast<ReplicationQuery>(parsed_query.query);
|
auto *replication_query = utils::Downcast<ReplicationQuery>(parsed_query.query);
|
||||||
auto callback =
|
auto callback =
|
||||||
HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, dba, notifications);
|
HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, manager, notifications);
|
||||||
|
|
||||||
return PreparedQuery{callback.header, std::move(parsed_query.required_privileges),
|
return PreparedQuery{callback.header, std::move(parsed_query.required_privileges),
|
||||||
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
|
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
|
||||||
@ -1310,14 +1313,15 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli
|
|||||||
throw SemanticException("CreateSnapshot query is not supported!");
|
throw SemanticException("CreateSnapshot query is not supported!");
|
||||||
}
|
}
|
||||||
|
|
||||||
PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, DbAccessor *dba) {
|
PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
|
||||||
|
msgs::ShardRequestManagerInterface *manager) {
|
||||||
if (in_explicit_transaction) {
|
if (in_explicit_transaction) {
|
||||||
throw SettingConfigInMulticommandTxException{};
|
throw SettingConfigInMulticommandTxException{};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto *setting_query = utils::Downcast<SettingQuery>(parsed_query.query);
|
auto *setting_query = utils::Downcast<SettingQuery>(parsed_query.query);
|
||||||
MG_ASSERT(setting_query);
|
MG_ASSERT(setting_query);
|
||||||
auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, dba);
|
auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, manager);
|
||||||
|
|
||||||
return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges),
|
return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges),
|
||||||
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
|
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
|
||||||
@ -1511,6 +1515,11 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
|||||||
ParsedQuery parsed_query =
|
ParsedQuery parsed_query =
|
||||||
ParseQuery(query_string, params, &interpreter_context_->ast_cache, interpreter_context_->config.query);
|
ParseQuery(query_string, params, &interpreter_context_->ast_cache, interpreter_context_->config.query);
|
||||||
query_execution->summary["parsing_time"] = parsing_timer.Elapsed().count();
|
query_execution->summary["parsing_time"] = parsing_timer.Elapsed().count();
|
||||||
|
if (!in_explicit_transaction_ &&
|
||||||
|
(utils::Downcast<CypherQuery>(parsed_query.query) || utils::Downcast<ExplainQuery>(parsed_query.query) ||
|
||||||
|
utils::Downcast<ProfileQuery>(parsed_query.query))) {
|
||||||
|
shard_request_manager_->StartTransaction();
|
||||||
|
}
|
||||||
|
|
||||||
utils::Timer planning_timer;
|
utils::Timer planning_timer;
|
||||||
PreparedQuery prepared_query;
|
PreparedQuery prepared_query;
|
||||||
@ -1533,9 +1542,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
|||||||
prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_,
|
prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_,
|
||||||
&query_execution->notifications, interpreter_context_);
|
&query_execution->notifications, interpreter_context_);
|
||||||
} else if (utils::Downcast<AuthQuery>(parsed_query.query)) {
|
} else if (utils::Downcast<AuthQuery>(parsed_query.query)) {
|
||||||
prepared_query = PrepareAuthQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
|
prepared_query = PrepareAuthQuery(
|
||||||
interpreter_context_, &*execution_db_accessor_,
|
std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_,
|
||||||
&query_execution->execution_memory_with_exception);
|
&*execution_db_accessor_, &query_execution->execution_memory_with_exception, shard_request_manager_.get());
|
||||||
} else if (utils::Downcast<InfoQuery>(parsed_query.query)) {
|
} else if (utils::Downcast<InfoQuery>(parsed_query.query)) {
|
||||||
prepared_query = PrepareInfoQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
|
prepared_query = PrepareInfoQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
|
||||||
interpreter_context_, interpreter_context_->db,
|
interpreter_context_, interpreter_context_->db,
|
||||||
@ -1546,7 +1555,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
|||||||
} else if (utils::Downcast<ReplicationQuery>(parsed_query.query)) {
|
} else if (utils::Downcast<ReplicationQuery>(parsed_query.query)) {
|
||||||
prepared_query =
|
prepared_query =
|
||||||
PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications,
|
PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications,
|
||||||
interpreter_context_, &*execution_db_accessor_);
|
interpreter_context_, shard_request_manager_.get());
|
||||||
} else if (utils::Downcast<LockPathQuery>(parsed_query.query)) {
|
} else if (utils::Downcast<LockPathQuery>(parsed_query.query)) {
|
||||||
prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_,
|
prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_,
|
||||||
&*execution_db_accessor_);
|
&*execution_db_accessor_);
|
||||||
@ -1563,7 +1572,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
|||||||
prepared_query =
|
prepared_query =
|
||||||
PrepareCreateSnapshotQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_);
|
PrepareCreateSnapshotQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_);
|
||||||
} else if (utils::Downcast<SettingQuery>(parsed_query.query)) {
|
} else if (utils::Downcast<SettingQuery>(parsed_query.query)) {
|
||||||
prepared_query = PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, &*execution_db_accessor_);
|
prepared_query =
|
||||||
|
PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, shard_request_manager_.get());
|
||||||
} else if (utils::Downcast<VersionQuery>(parsed_query.query)) {
|
} else if (utils::Downcast<VersionQuery>(parsed_query.query)) {
|
||||||
prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_);
|
prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_);
|
||||||
} else if (utils::Downcast<SchemaQuery>(parsed_query.query)) {
|
} else if (utils::Downcast<SchemaQuery>(parsed_query.query)) {
|
||||||
|
@ -296,6 +296,8 @@ class Interpreter final {
|
|||||||
*/
|
*/
|
||||||
void Abort();
|
void Abort();
|
||||||
|
|
||||||
|
const msgs::ShardRequestManagerInterface *GetShardRequestManager() const { return shard_request_manager_.get(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct QueryExecution {
|
struct QueryExecution {
|
||||||
std::optional<PreparedQuery> prepared_query;
|
std::optional<PreparedQuery> prepared_query;
|
||||||
|
@ -169,6 +169,7 @@ class DistributedCreateNodeCursor : public Cursor {
|
|||||||
if (input_cursor_->Pull(frame, context)) {
|
if (input_cursor_->Pull(frame, context)) {
|
||||||
auto &shard_manager = context.shard_request_manager;
|
auto &shard_manager = context.shard_request_manager;
|
||||||
shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame));
|
shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame));
|
||||||
|
PlaceNodeOnTheFrame(frame, context);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -179,8 +180,19 @@ class DistributedCreateNodeCursor : public Cursor {
|
|||||||
|
|
||||||
void Reset() override { state_ = {}; }
|
void Reset() override { state_ = {}; }
|
||||||
|
|
||||||
std::vector<msgs::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) const {
|
void PlaceNodeOnTheFrame(Frame &frame, ExecutionContext &context) {
|
||||||
|
// TODO(kostasrim) Make this work with batching
|
||||||
|
const auto primary_label = msgs::Label{.id = nodes_info_[0]->labels[0]};
|
||||||
|
msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[0])};
|
||||||
|
frame[nodes_info_.front()->symbol] = TypedValue(
|
||||||
|
query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[0], context.shard_request_manager));
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<msgs::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) {
|
||||||
std::vector<msgs::NewVertex> requests;
|
std::vector<msgs::NewVertex> requests;
|
||||||
|
// TODO(kostasrim) this assertion should be removed once we support multiple vertex creation
|
||||||
|
MG_ASSERT(nodes_info_.size() == 1);
|
||||||
|
msgs::PrimaryKey pk;
|
||||||
for (const auto &node_info : nodes_info_) {
|
for (const auto &node_info : nodes_info_) {
|
||||||
msgs::NewVertex rqst;
|
msgs::NewVertex rqst;
|
||||||
MG_ASSERT(!node_info->labels.empty(), "Cannot determine primary label");
|
MG_ASSERT(!node_info->labels.empty(), "Cannot determine primary label");
|
||||||
@ -188,17 +200,14 @@ class DistributedCreateNodeCursor : public Cursor {
|
|||||||
// TODO(jbajic) Fix properties not send,
|
// TODO(jbajic) Fix properties not send,
|
||||||
// suggestion: ignore distinction between properties and primary keys
|
// suggestion: ignore distinction between properties and primary keys
|
||||||
// since schema validation is done on storage side
|
// since schema validation is done on storage side
|
||||||
std::map<msgs::PropertyId, msgs::Value> properties;
|
|
||||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr,
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr,
|
||||||
storage::v3::View::NEW);
|
storage::v3::View::NEW);
|
||||||
if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info->properties)) {
|
if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info->properties)) {
|
||||||
for (const auto &[key, value_expression] : *node_info_properties) {
|
for (const auto &[key, value_expression] : *node_info_properties) {
|
||||||
TypedValue val = value_expression->Accept(evaluator);
|
TypedValue val = value_expression->Accept(evaluator);
|
||||||
|
|
||||||
if (context.shard_request_manager->IsPrimaryKey(primary_label, key)) {
|
if (context.shard_request_manager->IsPrimaryKey(primary_label, key)) {
|
||||||
rqst.primary_key.push_back(TypedValueToValue(val));
|
rqst.primary_key.push_back(TypedValueToValue(val));
|
||||||
} else {
|
pk.push_back(TypedValueToValue(val));
|
||||||
properties[key] = TypedValueToValue(val);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -207,9 +216,8 @@ class DistributedCreateNodeCursor : public Cursor {
|
|||||||
auto key_str = std::string(key);
|
auto key_str = std::string(key);
|
||||||
auto property_id = context.shard_request_manager->NameToProperty(key_str);
|
auto property_id = context.shard_request_manager->NameToProperty(key_str);
|
||||||
if (context.shard_request_manager->IsPrimaryKey(primary_label, property_id)) {
|
if (context.shard_request_manager->IsPrimaryKey(primary_label, property_id)) {
|
||||||
rqst.primary_key.push_back(storage::v3::TypedValueToValue(value));
|
rqst.primary_key.push_back(TypedValueToValue(value));
|
||||||
} else {
|
pk.push_back(TypedValueToValue(value));
|
||||||
properties[property_id] = TypedValueToValue(value);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -219,14 +227,18 @@ class DistributedCreateNodeCursor : public Cursor {
|
|||||||
}
|
}
|
||||||
// TODO(kostasrim) Copy non primary labels as well
|
// TODO(kostasrim) Copy non primary labels as well
|
||||||
rqst.label_ids.push_back(msgs::Label{.id = primary_label});
|
rqst.label_ids.push_back(msgs::Label{.id = primary_label});
|
||||||
|
src_vertex_props_.push_back(rqst.properties);
|
||||||
requests.push_back(std::move(rqst));
|
requests.push_back(std::move(rqst));
|
||||||
}
|
}
|
||||||
|
primary_keys_.push_back(std::move(pk));
|
||||||
return requests;
|
return requests;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const UniqueCursorPtr input_cursor_;
|
const UniqueCursorPtr input_cursor_;
|
||||||
std::vector<const NodeCreationInfo *> nodes_info_;
|
std::vector<const NodeCreationInfo *> nodes_info_;
|
||||||
|
std::vector<std::vector<std::pair<storage::v3::PropertyId, msgs::Value>>> src_vertex_props_;
|
||||||
|
std::vector<msgs::PrimaryKey> primary_keys_;
|
||||||
msgs::ExecutionState<msgs::CreateVerticesRequest> state_;
|
msgs::ExecutionState<msgs::CreateVerticesRequest> state_;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -687,7 +699,7 @@ bool Filter::FilterCursor::Pull(Frame &frame, ExecutionContext &context) {
|
|||||||
|
|
||||||
// Like all filters, newly set values should not affect filtering of old
|
// Like all filters, newly set values should not affect filtering of old
|
||||||
// nodes and edges.
|
// nodes and edges.
|
||||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.shard_request_manager,
|
||||||
storage::v3::View::OLD);
|
storage::v3::View::OLD);
|
||||||
while (input_cursor_->Pull(frame, context)) {
|
while (input_cursor_->Pull(frame, context)) {
|
||||||
if (EvaluateFilter(evaluator, self_.expression_)) return true;
|
if (EvaluateFilter(evaluator, self_.expression_)) return true;
|
||||||
@ -728,8 +740,8 @@ bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) {
|
|||||||
|
|
||||||
if (input_cursor_->Pull(frame, context)) {
|
if (input_cursor_->Pull(frame, context)) {
|
||||||
// Produce should always yield the latest results.
|
// Produce should always yield the latest results.
|
||||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
|
||||||
storage::v3::View::NEW);
|
context.shard_request_manager, storage::v3::View::NEW);
|
||||||
for (auto named_expr : self_.named_expressions_) named_expr->Accept(evaluator);
|
for (auto named_expr : self_.named_expressions_) named_expr->Accept(evaluator);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@ -1149,8 +1161,8 @@ class AggregateCursor : public Cursor {
|
|||||||
* aggregation results, and not on the number of inputs.
|
* aggregation results, and not on the number of inputs.
|
||||||
*/
|
*/
|
||||||
void ProcessAll(Frame *frame, ExecutionContext *context) {
|
void ProcessAll(Frame *frame, ExecutionContext *context) {
|
||||||
ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context, context->db_accessor,
|
ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context,
|
||||||
storage::v3::View::NEW);
|
context->shard_request_manager, storage::v3::View::NEW);
|
||||||
while (input_cursor_->Pull(*frame, *context)) {
|
while (input_cursor_->Pull(*frame, *context)) {
|
||||||
ProcessOne(*frame, &evaluator);
|
ProcessOne(*frame, &evaluator);
|
||||||
}
|
}
|
||||||
@ -1370,8 +1382,8 @@ bool Skip::SkipCursor::Pull(Frame &frame, ExecutionContext &context) {
|
|||||||
// First successful pull from the input, evaluate the skip expression.
|
// First successful pull from the input, evaluate the skip expression.
|
||||||
// The skip expression doesn't contain identifiers so graph view
|
// The skip expression doesn't contain identifiers so graph view
|
||||||
// parameter is not important.
|
// parameter is not important.
|
||||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
|
||||||
storage::v3::View::OLD);
|
context.shard_request_manager, storage::v3::View::OLD);
|
||||||
TypedValue to_skip = self_.expression_->Accept(evaluator);
|
TypedValue to_skip = self_.expression_->Accept(evaluator);
|
||||||
if (to_skip.type() != TypedValue::Type::Int)
|
if (to_skip.type() != TypedValue::Type::Int)
|
||||||
throw QueryRuntimeException("Number of elements to skip must be an integer.");
|
throw QueryRuntimeException("Number of elements to skip must be an integer.");
|
||||||
@ -1425,8 +1437,8 @@ bool Limit::LimitCursor::Pull(Frame &frame, ExecutionContext &context) {
|
|||||||
if (limit_ == -1) {
|
if (limit_ == -1) {
|
||||||
// Limit expression doesn't contain identifiers so graph view is not
|
// Limit expression doesn't contain identifiers so graph view is not
|
||||||
// important.
|
// important.
|
||||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
|
||||||
storage::v3::View::OLD);
|
context.shard_request_manager, storage::v3::View::OLD);
|
||||||
TypedValue limit = self_.expression_->Accept(evaluator);
|
TypedValue limit = self_.expression_->Accept(evaluator);
|
||||||
if (limit.type() != TypedValue::Type::Int)
|
if (limit.type() != TypedValue::Type::Int)
|
||||||
throw QueryRuntimeException("Limit on number of returned elements must be an integer.");
|
throw QueryRuntimeException("Limit on number of returned elements must be an integer.");
|
||||||
@ -1481,8 +1493,8 @@ class OrderByCursor : public Cursor {
|
|||||||
SCOPED_PROFILE_OP("OrderBy");
|
SCOPED_PROFILE_OP("OrderBy");
|
||||||
|
|
||||||
if (!did_pull_all_) {
|
if (!did_pull_all_) {
|
||||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
|
||||||
storage::v3::View::OLD);
|
context.shard_request_manager, storage::v3::View::OLD);
|
||||||
auto *mem = cache_.get_allocator().GetMemoryResource();
|
auto *mem = cache_.get_allocator().GetMemoryResource();
|
||||||
while (input_cursor_->Pull(frame, context)) {
|
while (input_cursor_->Pull(frame, context)) {
|
||||||
// collect the order_by elements
|
// collect the order_by elements
|
||||||
@ -1739,8 +1751,8 @@ class UnwindCursor : public Cursor {
|
|||||||
if (!input_cursor_->Pull(frame, context)) return false;
|
if (!input_cursor_->Pull(frame, context)) return false;
|
||||||
|
|
||||||
// successful pull from input, initialize value and iterator
|
// successful pull from input, initialize value and iterator
|
||||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
|
||||||
storage::v3::View::OLD);
|
context.shard_request_manager, storage::v3::View::OLD);
|
||||||
TypedValue input_value = self_.input_expression_->Accept(evaluator);
|
TypedValue input_value = self_.input_expression_->Accept(evaluator);
|
||||||
if (input_value.type() != TypedValue::Type::List)
|
if (input_value.type() != TypedValue::Type::List)
|
||||||
throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type());
|
throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type());
|
||||||
@ -2217,7 +2229,7 @@ class LoadCsvCursor : public Cursor {
|
|||||||
// self_->delimiter_, and self_->quote_ earlier (say, in the interpreter.cpp)
|
// self_->delimiter_, and self_->quote_ earlier (say, in the interpreter.cpp)
|
||||||
// without massacring the code even worse than I did here
|
// without massacring the code even worse than I did here
|
||||||
if (UNLIKELY(!reader_)) {
|
if (UNLIKELY(!reader_)) {
|
||||||
reader_ = MakeReader(&context.evaluation_context);
|
reader_ = MakeReader(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool input_pulled = input_cursor_->Pull(frame, context);
|
bool input_pulled = input_cursor_->Pull(frame, context);
|
||||||
@ -2246,11 +2258,12 @@ class LoadCsvCursor : public Cursor {
|
|||||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
csv::Reader MakeReader(EvaluationContext *eval_context) {
|
csv::Reader MakeReader(ExecutionContext &context) {
|
||||||
|
auto &eval_context = context.evaluation_context;
|
||||||
Frame frame(0);
|
Frame frame(0);
|
||||||
SymbolTable symbol_table;
|
SymbolTable symbol_table;
|
||||||
DbAccessor *dba = nullptr;
|
auto evaluator =
|
||||||
auto evaluator = ExpressionEvaluator(&frame, symbol_table, *eval_context, dba, storage::v3::View::OLD);
|
ExpressionEvaluator(&frame, symbol_table, eval_context, context.shard_request_manager, storage::v3::View::OLD);
|
||||||
|
|
||||||
auto maybe_file = ToOptionalString(&evaluator, self_->file_);
|
auto maybe_file = ToOptionalString(&evaluator, self_->file_);
|
||||||
auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_);
|
auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_);
|
||||||
@ -2287,8 +2300,8 @@ class ForeachCursor : public Cursor {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
|
||||||
storage::v3::View::NEW);
|
context.shard_request_manager, storage::v3::View::NEW);
|
||||||
TypedValue expr_result = expression->Accept(evaluator);
|
TypedValue expr_result = expression->Accept(evaluator);
|
||||||
|
|
||||||
if (expr_result.IsNull()) {
|
if (expr_result.IsNull()) {
|
||||||
@ -2458,15 +2471,51 @@ class DistributedExpandCursor : public Cursor {
|
|||||||
: self_(self),
|
: self_(self),
|
||||||
input_cursor_(self.input_->MakeCursor(mem)),
|
input_cursor_(self.input_->MakeCursor(mem)),
|
||||||
current_in_edge_it_(current_in_edges_.begin()),
|
current_in_edge_it_(current_in_edges_.begin()),
|
||||||
current_out_edge_it_(current_out_edges_.begin()) {
|
current_out_edge_it_(current_out_edges_.begin()) {}
|
||||||
if (self_.common_.existing_node) {
|
|
||||||
throw QueryRuntimeException("Cannot use existing node with DistributedExpandOne cursor!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
using VertexAccessor = accessors::VertexAccessor;
|
using VertexAccessor = accessors::VertexAccessor;
|
||||||
using EdgeAccessor = accessors::EdgeAccessor;
|
using EdgeAccessor = accessors::EdgeAccessor;
|
||||||
|
|
||||||
|
static constexpr auto DirectionToMsgsDirection(const auto direction) {
|
||||||
|
switch (direction) {
|
||||||
|
case EdgeAtom::Direction::IN:
|
||||||
|
return msgs::EdgeDirection::IN;
|
||||||
|
case EdgeAtom::Direction::OUT:
|
||||||
|
return msgs::EdgeDirection::OUT;
|
||||||
|
case EdgeAtom::Direction::BOTH:
|
||||||
|
return msgs::EdgeDirection::BOTH;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
void PullDstVertex(Frame &frame, ExecutionContext &context, const EdgeAtom::Direction direction) {
|
||||||
|
if (self_.common_.existing_node) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
MG_ASSERT(direction != EdgeAtom::Direction::BOTH);
|
||||||
|
const auto &edge = frame[self_.common_.edge_symbol].ValueEdge();
|
||||||
|
static auto get_dst_vertex = [&edge](const EdgeAtom::Direction direction) {
|
||||||
|
switch (direction) {
|
||||||
|
case EdgeAtom::Direction::IN:
|
||||||
|
return edge.From().Id();
|
||||||
|
case EdgeAtom::Direction::OUT:
|
||||||
|
return edge.To().Id();
|
||||||
|
case EdgeAtom::Direction::BOTH:
|
||||||
|
throw std::runtime_error("EdgeDirection Both not implemented");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
msgs::ExpandOneRequest request;
|
||||||
|
// to not fetch any properties of the edges
|
||||||
|
request.edge_properties.emplace();
|
||||||
|
request.src_vertices.push_back(get_dst_vertex(direction));
|
||||||
|
request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN;
|
||||||
|
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
|
||||||
|
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
|
||||||
|
MG_ASSERT(result_rows.size() == 1);
|
||||||
|
auto &result_row = result_rows.front();
|
||||||
|
frame[self_.common_.node_symbol] = accessors::VertexAccessor(
|
||||||
|
msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.shard_request_manager);
|
||||||
|
}
|
||||||
|
|
||||||
bool InitEdges(Frame &frame, ExecutionContext &context) {
|
bool InitEdges(Frame &frame, ExecutionContext &context) {
|
||||||
// Input Vertex could be null if it is created by a failed optional match. In
|
// Input Vertex could be null if it is created by a failed optional match. In
|
||||||
// those cases we skip that input pull and continue with the next.
|
// those cases we skip that input pull and continue with the next.
|
||||||
@ -2480,44 +2529,41 @@ class DistributedExpandCursor : public Cursor {
|
|||||||
|
|
||||||
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
|
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
|
||||||
auto &vertex = vertex_value.ValueVertex();
|
auto &vertex = vertex_value.ValueVertex();
|
||||||
static constexpr auto direction_to_msgs_direction = [](const EdgeAtom::Direction direction) {
|
|
||||||
switch (direction) {
|
|
||||||
case EdgeAtom::Direction::IN:
|
|
||||||
return msgs::EdgeDirection::IN;
|
|
||||||
case EdgeAtom::Direction::OUT:
|
|
||||||
return msgs::EdgeDirection::OUT;
|
|
||||||
case EdgeAtom::Direction::BOTH:
|
|
||||||
return msgs::EdgeDirection::BOTH;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
msgs::ExpandOneRequest request;
|
msgs::ExpandOneRequest request;
|
||||||
request.direction = direction_to_msgs_direction(self_.common_.direction);
|
request.direction = DirectionToMsgsDirection(self_.common_.direction);
|
||||||
// to not fetch any properties of the edges
|
// to not fetch any properties of the edges
|
||||||
request.edge_properties.emplace();
|
request.edge_properties.emplace();
|
||||||
request.src_vertices.push_back(vertex.Id());
|
request.src_vertices.push_back(vertex.Id());
|
||||||
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
|
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
|
||||||
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
|
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
|
||||||
MG_ASSERT(result_rows.size() == 1);
|
|
||||||
auto &result_row = result_rows.front();
|
auto &result_row = result_rows.front();
|
||||||
|
|
||||||
const auto convert_edges = [&vertex](
|
if (self_.common_.existing_node) {
|
||||||
|
const auto &node = frame[self_.common_.node_symbol].ValueVertex().Id();
|
||||||
|
auto &in = result_row.in_edges_with_specific_properties;
|
||||||
|
std::erase_if(in, [&node](auto &edge) { return edge.other_end != node; });
|
||||||
|
auto &out = result_row.out_edges_with_specific_properties;
|
||||||
|
std::erase_if(out, [&node](auto &edge) { return edge.other_end != node; });
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto convert_edges = [&vertex, &context](
|
||||||
std::vector<msgs::ExpandOneResultRow::EdgeWithSpecificProperties> &&edge_messages,
|
std::vector<msgs::ExpandOneResultRow::EdgeWithSpecificProperties> &&edge_messages,
|
||||||
const EdgeAtom::Direction direction) {
|
const EdgeAtom::Direction direction) {
|
||||||
std::vector<EdgeAccessor> edge_accessors;
|
std::vector<EdgeAccessor> edge_accessors;
|
||||||
edge_accessors.reserve(edge_messages.size());
|
edge_accessors.reserve(edge_messages.size());
|
||||||
|
|
||||||
switch (direction) {
|
switch (direction) {
|
||||||
case EdgeAtom::Direction::IN: {
|
case EdgeAtom::Direction::IN: {
|
||||||
for (auto &edge : edge_messages) {
|
for (auto &edge : edge_messages) {
|
||||||
edge_accessors.emplace_back(
|
edge_accessors.emplace_back(msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type},
|
||||||
msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type});
|
context.shard_request_manager);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case EdgeAtom::Direction::OUT: {
|
case EdgeAtom::Direction::OUT: {
|
||||||
for (auto &edge : edge_messages) {
|
for (auto &edge : edge_messages) {
|
||||||
edge_accessors.emplace_back(
|
edge_accessors.emplace_back(msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type},
|
||||||
msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type});
|
context.shard_request_manager);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -2527,12 +2573,13 @@ class DistributedExpandCursor : public Cursor {
|
|||||||
}
|
}
|
||||||
return edge_accessors;
|
return edge_accessors;
|
||||||
};
|
};
|
||||||
|
|
||||||
current_in_edges_ =
|
current_in_edges_ =
|
||||||
convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::IN);
|
convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::IN);
|
||||||
current_in_edge_it_ = current_in_edges_.begin();
|
current_in_edge_it_ = current_in_edges_.begin();
|
||||||
current_in_edges_ =
|
current_out_edges_ =
|
||||||
convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::OUT);
|
convert_edges(std::move(result_row.out_edges_with_specific_properties), EdgeAtom::Direction::OUT);
|
||||||
current_in_edge_it_ = current_in_edges_.begin();
|
current_out_edge_it_ = current_out_edges_.begin();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2540,19 +2587,6 @@ class DistributedExpandCursor : public Cursor {
|
|||||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||||
SCOPED_PROFILE_OP("DistributedExpand");
|
SCOPED_PROFILE_OP("DistributedExpand");
|
||||||
// A helper function for expanding a node from an edge.
|
// A helper function for expanding a node from an edge.
|
||||||
auto pull_node = [this, &frame](const EdgeAccessor &new_edge, EdgeAtom::Direction direction) {
|
|
||||||
if (self_.common_.existing_node) return;
|
|
||||||
switch (direction) {
|
|
||||||
case EdgeAtom::Direction::IN:
|
|
||||||
frame[self_.common_.node_symbol] = new_edge.From();
|
|
||||||
break;
|
|
||||||
case EdgeAtom::Direction::OUT:
|
|
||||||
frame[self_.common_.node_symbol] = new_edge.To();
|
|
||||||
break;
|
|
||||||
case EdgeAtom::Direction::BOTH:
|
|
||||||
LOG_FATAL("Must indicate exact expansion direction here");
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (MustAbort(context)) throw HintedAbortError();
|
if (MustAbort(context)) throw HintedAbortError();
|
||||||
@ -2561,7 +2595,7 @@ class DistributedExpandCursor : public Cursor {
|
|||||||
auto &edge = *current_in_edge_it_;
|
auto &edge = *current_in_edge_it_;
|
||||||
++current_in_edge_it_;
|
++current_in_edge_it_;
|
||||||
frame[self_.common_.edge_symbol] = edge;
|
frame[self_.common_.edge_symbol] = edge;
|
||||||
pull_node(edge, EdgeAtom::Direction::IN);
|
PullDstVertex(frame, context, EdgeAtom::Direction::IN);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2573,7 +2607,7 @@ class DistributedExpandCursor : public Cursor {
|
|||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
frame[self_.common_.edge_symbol] = edge;
|
frame[self_.common_.edge_symbol] = edge;
|
||||||
pull_node(edge, EdgeAtom::Direction::OUT);
|
PullDstVertex(frame, context, EdgeAtom::Direction::OUT);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -387,7 +387,6 @@ struct GetPropertiesResponse {
|
|||||||
enum class EdgeDirection : uint8_t { OUT = 1, IN = 2, BOTH = 3 };
|
enum class EdgeDirection : uint8_t { OUT = 1, IN = 2, BOTH = 3 };
|
||||||
|
|
||||||
struct ExpandOneRequest {
|
struct ExpandOneRequest {
|
||||||
// TODO(antaljanosbenjamin): Filtering based on the id of the other end of the edge?
|
|
||||||
Hlc transaction_id;
|
Hlc transaction_id;
|
||||||
std::vector<VertexId> src_vertices;
|
std::vector<VertexId> src_vertices;
|
||||||
// return types that type is in this list
|
// return types that type is in this list
|
||||||
|
@ -171,6 +171,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
|||||||
|
|
||||||
if (hlc_response.fresher_shard_map) {
|
if (hlc_response.fresher_shard_map) {
|
||||||
shards_map_ = hlc_response.fresher_shard_map.value();
|
shards_map_ = hlc_response.fresher_shard_map.value();
|
||||||
|
SetUpNameIdMappers();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -186,6 +187,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
|||||||
|
|
||||||
if (hlc_response.fresher_shard_map) {
|
if (hlc_response.fresher_shard_map) {
|
||||||
shards_map_ = hlc_response.fresher_shard_map.value();
|
shards_map_ = hlc_response.fresher_shard_map.value();
|
||||||
|
SetUpNameIdMappers();
|
||||||
}
|
}
|
||||||
auto commit_timestamp = hlc_response.new_hlc;
|
auto commit_timestamp = hlc_response.new_hlc;
|
||||||
|
|
||||||
@ -223,14 +225,14 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
|||||||
return shards_map_.GetLabelId(name).value();
|
return shards_map_.GetLabelId(name).value();
|
||||||
}
|
}
|
||||||
|
|
||||||
const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const override {
|
const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override {
|
||||||
return shards_map_.GetPropertyName(prop);
|
return properties_.IdToName(id.AsUint());
|
||||||
}
|
}
|
||||||
const std::string &LabelToName(memgraph::storage::v3::LabelId label) const override {
|
const std::string &LabelToName(memgraph::storage::v3::LabelId id) const override {
|
||||||
return shards_map_.GetLabelName(label);
|
return labels_.IdToName(id.AsUint());
|
||||||
}
|
}
|
||||||
const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const override {
|
const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId id) const override {
|
||||||
return shards_map_.GetEdgeTypeName(type);
|
return edge_types_.IdToName(id.AsUint());
|
||||||
}
|
}
|
||||||
|
|
||||||
bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override {
|
bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override {
|
||||||
@ -358,7 +360,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
|||||||
std::vector<VertexAccessor> accessors;
|
std::vector<VertexAccessor> accessors;
|
||||||
for (auto &response : responses) {
|
for (auto &response : responses) {
|
||||||
for (auto &result_row : response.results) {
|
for (auto &result_row : response.results) {
|
||||||
accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props)));
|
accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props), this));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return accessors;
|
return accessors;
|
||||||
@ -697,7 +699,28 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SetUpNameIdMappers() {
|
||||||
|
std::unordered_map<uint64_t, std::string> id_to_name;
|
||||||
|
for (const auto &[name, id] : shards_map_.labels) {
|
||||||
|
id_to_name.emplace(id.AsUint(), name);
|
||||||
|
}
|
||||||
|
labels_.StoreMapping(std::move(id_to_name));
|
||||||
|
id_to_name.clear();
|
||||||
|
for (const auto &[name, id] : shards_map_.properties) {
|
||||||
|
id_to_name.emplace(id.AsUint(), name);
|
||||||
|
}
|
||||||
|
properties_.StoreMapping(std::move(id_to_name));
|
||||||
|
id_to_name.clear();
|
||||||
|
for (const auto &[name, id] : shards_map_.edge_types) {
|
||||||
|
id_to_name.emplace(id.AsUint(), name);
|
||||||
|
}
|
||||||
|
edge_types_.StoreMapping(std::move(id_to_name));
|
||||||
|
}
|
||||||
|
|
||||||
ShardMap shards_map_;
|
ShardMap shards_map_;
|
||||||
|
storage::v3::NameIdMapper properties_;
|
||||||
|
storage::v3::NameIdMapper edge_types_;
|
||||||
|
storage::v3::NameIdMapper labels_;
|
||||||
CoordinatorClient coord_cli_;
|
CoordinatorClient coord_cli_;
|
||||||
RsmStorageClientManager<StorageClient> storage_cli_manager_;
|
RsmStorageClientManager<StorageClient> storage_cli_manager_;
|
||||||
memgraph::io::Io<TTransport> io_;
|
memgraph::io::Io<TTransport> io_;
|
||||||
|
@ -53,6 +53,10 @@ class NameIdMapper final {
|
|||||||
return it->second;
|
return it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const auto &GetIdToNameMap() const { return id_to_name_; }
|
||||||
|
|
||||||
|
const auto &GetNameToIdMap() const { return name_to_id_; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Necessary for comparison with string_view nad string
|
// Necessary for comparison with string_view nad string
|
||||||
// https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0919r1.html
|
// https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0919r1.html
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
|
|
||||||
#include "parser/opencypher/parser.hpp"
|
#include "parser/opencypher/parser.hpp"
|
||||||
#include "query/v2/requests.hpp"
|
#include "query/v2/requests.hpp"
|
||||||
|
#include "storage/v2/view.hpp"
|
||||||
#include "storage/v3/bindings/ast/ast.hpp"
|
#include "storage/v3/bindings/ast/ast.hpp"
|
||||||
#include "storage/v3/bindings/cypher_main_visitor.hpp"
|
#include "storage/v3/bindings/cypher_main_visitor.hpp"
|
||||||
#include "storage/v3/bindings/db_accessor.hpp"
|
#include "storage/v3/bindings/db_accessor.hpp"
|
||||||
@ -113,6 +114,24 @@ std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::optional<std::map<PropertyId, Value>> PrimaryKeysFromAccessor(const VertexAccessor &acc, View view,
|
||||||
|
const Schemas::Schema *schema) {
|
||||||
|
std::map<PropertyId, Value> ret;
|
||||||
|
auto props = acc.Properties(view);
|
||||||
|
auto maybe_pk = acc.PrimaryKey(view);
|
||||||
|
if (maybe_pk.HasError()) {
|
||||||
|
spdlog::debug("Encountered an error while trying to get vertex primary key.");
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
auto &pk = maybe_pk.GetValue();
|
||||||
|
MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!");
|
||||||
|
for (size_t i{0}; i < schema->second.size(); ++i) {
|
||||||
|
ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i])));
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view,
|
std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view,
|
||||||
const Schemas::Schema *schema) {
|
const Schemas::Schema *schema) {
|
||||||
std::map<PropertyId, Value> ret;
|
std::map<PropertyId, Value> ret;
|
||||||
@ -129,17 +148,9 @@ std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(cons
|
|||||||
});
|
});
|
||||||
properties.clear();
|
properties.clear();
|
||||||
|
|
||||||
// TODO(antaljanosbenjamin): Once the VertexAccessor::Properties returns also the primary keys, we can get rid of this
|
auto pks = PrimaryKeysFromAccessor(acc, view, schema);
|
||||||
// code.
|
if (pks) {
|
||||||
auto maybe_pk = acc.PrimaryKey(view);
|
ret.merge(*pks);
|
||||||
if (maybe_pk.HasError()) {
|
|
||||||
spdlog::debug("Encountered an error while trying to get vertex primary key.");
|
|
||||||
}
|
|
||||||
|
|
||||||
auto &pk = maybe_pk.GetValue();
|
|
||||||
MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!");
|
|
||||||
for (size_t i{0}; i < schema->second.size(); ++i) {
|
|
||||||
ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i])));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
@ -190,7 +201,9 @@ std::optional<msgs::Vertex> FillUpSourceVertex(const std::optional<VertexAccesso
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc,
|
std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc,
|
||||||
const msgs::ExpandOneRequest &req) {
|
const msgs::ExpandOneRequest &req,
|
||||||
|
storage::v3::View view,
|
||||||
|
const Schemas::Schema *schema) {
|
||||||
std::map<PropertyId, Value> src_vertex_properties;
|
std::map<PropertyId, Value> src_vertex_properties;
|
||||||
|
|
||||||
if (!req.src_vertex_properties) {
|
if (!req.src_vertex_properties) {
|
||||||
@ -204,6 +217,10 @@ std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const st
|
|||||||
for (auto &[key, val] : props.GetValue()) {
|
for (auto &[key, val] : props.GetValue()) {
|
||||||
src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val))));
|
src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val))));
|
||||||
}
|
}
|
||||||
|
auto pks = PrimaryKeysFromAccessor(*v_acc, view, schema);
|
||||||
|
if (pks) {
|
||||||
|
src_vertex_properties.merge(*pks);
|
||||||
|
}
|
||||||
|
|
||||||
} else if (req.src_vertex_properties.value().empty()) {
|
} else if (req.src_vertex_properties.value().empty()) {
|
||||||
// NOOP
|
// NOOP
|
||||||
@ -264,7 +281,6 @@ std::optional<std::array<std::vector<EdgeAccessor>, 2>> FillUpConnectingEdges(
|
|||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN);
|
in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN);
|
||||||
|
|
||||||
auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types);
|
auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types);
|
||||||
if (out_edges_result.HasError()) {
|
if (out_edges_result.HasError()) {
|
||||||
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
|
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
|
||||||
@ -303,7 +319,8 @@ bool FillEdges(const std::vector<EdgeAccessor> &edges, msgs::ExpandOneResultRow
|
|||||||
|
|
||||||
std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||||
Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req,
|
Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req,
|
||||||
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler) {
|
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler,
|
||||||
|
const Schemas::Schema *schema) {
|
||||||
/// Fill up source vertex
|
/// Fill up source vertex
|
||||||
const auto primary_key = ConvertPropertyVector(std::move(src_vertex.second));
|
const auto primary_key = ConvertPropertyVector(std::move(src_vertex.second));
|
||||||
auto v_acc = acc.FindVertex(primary_key, View::NEW);
|
auto v_acc = acc.FindVertex(primary_key, View::NEW);
|
||||||
@ -312,9 +329,9 @@ std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
|||||||
if (!source_vertex) {
|
if (!source_vertex) {
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
std::optional<std::map<PropertyId, Value>> src_vertex_properties;
|
||||||
|
src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema);
|
||||||
|
|
||||||
/// Fill up source vertex properties
|
|
||||||
auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req);
|
|
||||||
if (!src_vertex_properties) {
|
if (!src_vertex_properties) {
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
@ -852,7 +869,8 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler);
|
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler,
|
||||||
|
shard_->GetSchema(shard_->PrimaryLabel()));
|
||||||
|
|
||||||
if (!result) {
|
if (!result) {
|
||||||
action_successful = false;
|
action_successful = false;
|
||||||
|
@ -33,8 +33,9 @@ std::ostream &operator<<(std::ostream &in, const std::vector<T> &vector) {
|
|||||||
return in;
|
return in;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename K, typename V>
|
namespace detail {
|
||||||
std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
|
template <typename T>
|
||||||
|
std::ostream &MapImpl(std::ostream &in, const T &map) {
|
||||||
in << "{";
|
in << "{";
|
||||||
bool first = true;
|
bool first = true;
|
||||||
for (const auto &[a, b] : map) {
|
for (const auto &[a, b] : map) {
|
||||||
@ -49,6 +50,17 @@ std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
|
|||||||
in << "}";
|
in << "}";
|
||||||
return in;
|
return in;
|
||||||
}
|
}
|
||||||
|
} // namespace detail
|
||||||
|
|
||||||
|
template <typename K, typename V>
|
||||||
|
std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
|
||||||
|
return detail::MapImpl(in, map);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename K, typename V, typename THash, typename Cmp>
|
||||||
|
std::ostream &operator<<(std::ostream &in, const std::unordered_map<K, V, THash, Cmp> &map) {
|
||||||
|
return detail::MapImpl(in, map);
|
||||||
|
}
|
||||||
|
|
||||||
template <typename K, typename V>
|
template <typename K, typename V>
|
||||||
std::ostream &operator<<(std::ostream &in, const std::unordered_map<K, V> &map) {
|
std::ostream &operator<<(std::ostream &in, const std::unordered_map<K, V> &map) {
|
||||||
|
@ -3,3 +3,8 @@ function(distributed_queries_e2e_python_files FILE_NAME)
|
|||||||
endfunction()
|
endfunction()
|
||||||
|
|
||||||
distributed_queries_e2e_python_files(distributed_queries.py)
|
distributed_queries_e2e_python_files(distributed_queries.py)
|
||||||
|
distributed_queries_e2e_python_files(unwind_collect.py)
|
||||||
|
distributed_queries_e2e_python_files(order_by_and_limit.py)
|
||||||
|
distributed_queries_e2e_python_files(distinct.py)
|
||||||
|
distributed_queries_e2e_python_files(optional_match.py)
|
||||||
|
distributed_queries_e2e_python_files(common.py)
|
||||||
|
44
tests/e2e/distributed_queries/common.py
Normal file
44
tests/e2e/distributed_queries/common.py
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
# Copyright 2022 Memgraph Ltd.
|
||||||
|
#
|
||||||
|
# Use of this software is governed by the Business Source License
|
||||||
|
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
# License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
#
|
||||||
|
# As of the Change Date specified in that file, in accordance with
|
||||||
|
# the Business Source License, use of this software will be governed
|
||||||
|
# by the Apache License, Version 2.0, included in the file
|
||||||
|
# licenses/APL.txt.
|
||||||
|
|
||||||
|
import typing
|
||||||
|
import mgclient
|
||||||
|
import sys
|
||||||
|
import pytest
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def connection():
|
||||||
|
connection = connect()
|
||||||
|
return connection
|
||||||
|
|
||||||
|
|
||||||
|
def connect(**kwargs) -> mgclient.Connection:
|
||||||
|
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
|
||||||
|
connection.autocommit = True
|
||||||
|
return connection
|
||||||
|
|
||||||
|
|
||||||
|
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
|
||||||
|
cursor.execute(query, params)
|
||||||
|
return cursor.fetchall()
|
||||||
|
|
||||||
|
|
||||||
|
def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int):
|
||||||
|
results = execute_and_fetch_all(cursor, query)
|
||||||
|
return len(results) == n
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_shard_manager_to_initialize():
|
||||||
|
# The ShardManager in memgraph takes some time to initialize
|
||||||
|
# the shards, thus we cannot just run the queries right away
|
||||||
|
time.sleep(3)
|
38
tests/e2e/distributed_queries/distinct.py
Normal file
38
tests/e2e/distributed_queries/distinct.py
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
# Copyright 2022 Memgraph Ltd.
|
||||||
|
#
|
||||||
|
# Use of this software is governed by the Business Source License
|
||||||
|
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
# License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
#
|
||||||
|
# As of the Change Date specified in that file, in accordance with
|
||||||
|
# the Business Source License, use of this software will be governed
|
||||||
|
# by the Apache License, Version 2.0, included in the file
|
||||||
|
# licenses/APL.txt.
|
||||||
|
|
||||||
|
import typing
|
||||||
|
import mgclient
|
||||||
|
import sys
|
||||||
|
import pytest
|
||||||
|
import time
|
||||||
|
from common import *
|
||||||
|
|
||||||
|
|
||||||
|
def test_distinct(connection):
|
||||||
|
wait_for_shard_manager_to_initialize()
|
||||||
|
cursor = connection.cursor()
|
||||||
|
|
||||||
|
assert has_n_result_row(cursor, "CREATE (n :label {property:0})", 0)
|
||||||
|
assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0)
|
||||||
|
assert has_n_result_row(cursor, "MATCH (n), (m) CREATE (n)-[:TO]->(m)", 0)
|
||||||
|
assert has_n_result_row(cursor, "MATCH (n)-[r]->(m) RETURN r", 4)
|
||||||
|
|
||||||
|
results = execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN DISTINCT m")
|
||||||
|
assert len(results) == 2
|
||||||
|
for i, n in enumerate(results):
|
||||||
|
n_props = n[0].properties
|
||||||
|
assert len(n_props) == 1
|
||||||
|
assert n_props["property"] == i
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(pytest.main([__file__, "-rA"]))
|
@ -14,34 +14,7 @@ import mgclient
|
|||||||
import sys
|
import sys
|
||||||
import pytest
|
import pytest
|
||||||
import time
|
import time
|
||||||
|
from common import *
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
|
||||||
def connection():
|
|
||||||
connection = connect()
|
|
||||||
return connection
|
|
||||||
|
|
||||||
|
|
||||||
def connect(**kwargs) -> mgclient.Connection:
|
|
||||||
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
|
|
||||||
connection.autocommit = True
|
|
||||||
return connection
|
|
||||||
|
|
||||||
|
|
||||||
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
|
|
||||||
cursor.execute(query, params)
|
|
||||||
return cursor.fetchall()
|
|
||||||
|
|
||||||
|
|
||||||
def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int):
|
|
||||||
results = execute_and_fetch_all(cursor, query)
|
|
||||||
return len(results) == n
|
|
||||||
|
|
||||||
|
|
||||||
def wait_for_shard_manager_to_initialize():
|
|
||||||
# The ShardManager in memgraph takes some time to initialize
|
|
||||||
# the shards, thus we cannot just run the queries right away
|
|
||||||
time.sleep(3)
|
|
||||||
|
|
||||||
|
|
||||||
def test_vertex_creation_and_scanall(connection):
|
def test_vertex_creation_and_scanall(connection):
|
||||||
@ -62,7 +35,7 @@ def test_vertex_creation_and_scanall(connection):
|
|||||||
assert len(results) == 9
|
assert len(results) == 9
|
||||||
for (n, r, m) in results:
|
for (n, r, m) in results:
|
||||||
n_props = n.properties
|
n_props = n.properties
|
||||||
assert len(n_props) == 0, "n is not expected to have properties, update the test!"
|
assert len(n_props) == 1, "n is not expected to have properties, update the test!"
|
||||||
assert len(n.labels) == 0, "n is not expected to have labels, update the test!"
|
assert len(n.labels) == 0, "n is not expected to have labels, update the test!"
|
||||||
|
|
||||||
assert r.type == "TO"
|
assert r.type == "TO"
|
||||||
|
37
tests/e2e/distributed_queries/optional_match.py
Normal file
37
tests/e2e/distributed_queries/optional_match.py
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
# Copyright 2022 Memgraph Ltd.
|
||||||
|
#
|
||||||
|
# Use of this software is governed by the Business Source License
|
||||||
|
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
# License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
#
|
||||||
|
# As of the Change Date specified in that file, in accordance with
|
||||||
|
# the Business Source License, use of this software will be governed
|
||||||
|
# by the Apache License, Version 2.0, included in the file
|
||||||
|
# licenses/APL.txt.
|
||||||
|
|
||||||
|
import typing
|
||||||
|
import mgclient
|
||||||
|
import sys
|
||||||
|
import pytest
|
||||||
|
import time
|
||||||
|
from common import *
|
||||||
|
|
||||||
|
|
||||||
|
def test_optional_match(connection):
|
||||||
|
wait_for_shard_manager_to_initialize()
|
||||||
|
cursor = connection.cursor()
|
||||||
|
|
||||||
|
assert has_n_result_row(cursor, "CREATE (n :label {property:0})", 0)
|
||||||
|
|
||||||
|
results = execute_and_fetch_all(
|
||||||
|
cursor, "MATCH (n:label) OPTIONAL MATCH (n:label)-[:TO]->(parent:label) RETURN parent"
|
||||||
|
)
|
||||||
|
assert len(results) == 1
|
||||||
|
|
||||||
|
assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0)
|
||||||
|
assert has_n_result_row(cursor, "MATCH (n), (m) CREATE (n)-[:TO]->(m)", 0)
|
||||||
|
assert has_n_result_row(cursor, "MATCH (n:label) OPTIONAL MATCH (n)-[r:TO]->(m:label) RETURN r", 4)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(pytest.main([__file__, "-rA"]))
|
44
tests/e2e/distributed_queries/order_by_and_limit.py
Normal file
44
tests/e2e/distributed_queries/order_by_and_limit.py
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
# Copyright 2022 Memgraph Ltd.
|
||||||
|
#
|
||||||
|
# Use of this software is governed by the Business Source License
|
||||||
|
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
# License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
#
|
||||||
|
# As of the Change Date specified in that file, in accordance with
|
||||||
|
# the Business Source License, use of this software will be governed
|
||||||
|
# by the Apache License, Version 2.0, included in the file
|
||||||
|
# licenses/APL.txt.
|
||||||
|
|
||||||
|
import typing
|
||||||
|
import mgclient
|
||||||
|
import sys
|
||||||
|
import pytest
|
||||||
|
import time
|
||||||
|
from common import *
|
||||||
|
|
||||||
|
|
||||||
|
def test_order_by_and_limit(connection):
|
||||||
|
wait_for_shard_manager_to_initialize()
|
||||||
|
cursor = connection.cursor()
|
||||||
|
|
||||||
|
assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0)
|
||||||
|
assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0)
|
||||||
|
assert has_n_result_row(cursor, "CREATE (n :label {property:3})", 0)
|
||||||
|
assert has_n_result_row(cursor, "CREATE (n :label {property:4})", 0)
|
||||||
|
|
||||||
|
results = execute_and_fetch_all(cursor, "MATCH (n) RETURN n ORDER BY n.property DESC")
|
||||||
|
assert len(results) == 4
|
||||||
|
i = 4
|
||||||
|
for n in results:
|
||||||
|
n_props = n[0].properties
|
||||||
|
assert len(n_props) == 1
|
||||||
|
assert n_props["property"] == i
|
||||||
|
i = i - 1
|
||||||
|
|
||||||
|
result = execute_and_fetch_all(cursor, "MATCH (n) RETURN n ORDER BY n.property LIMIT 1")
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0][0].properties["property"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(pytest.main([__file__, "-rA"]))
|
34
tests/e2e/distributed_queries/unwind_collect.py
Normal file
34
tests/e2e/distributed_queries/unwind_collect.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
# Copyright 2022 Memgraph Ltd.
|
||||||
|
#
|
||||||
|
# Use of this software is governed by the Business Source License
|
||||||
|
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
# License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
#
|
||||||
|
# As of the Change Date specified in that file, in accordance with
|
||||||
|
# the Business Source License, use of this software will be governed
|
||||||
|
# by the Apache License, Version 2.0, included in the file
|
||||||
|
# licenses/APL.txt.
|
||||||
|
|
||||||
|
import typing
|
||||||
|
import mgclient
|
||||||
|
import sys
|
||||||
|
import pytest
|
||||||
|
import time
|
||||||
|
from common import *
|
||||||
|
|
||||||
|
|
||||||
|
def test_collect_unwind(connection):
|
||||||
|
wait_for_shard_manager_to_initialize()
|
||||||
|
cursor = connection.cursor()
|
||||||
|
|
||||||
|
assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0)
|
||||||
|
assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0)
|
||||||
|
assert has_n_result_row(cursor, "CREATE (n :label {property:3})", 0)
|
||||||
|
assert has_n_result_row(cursor, "CREATE (n :label {property:4})", 0)
|
||||||
|
|
||||||
|
assert has_n_result_row(cursor, "MATCH (n) WITH collect(n) AS result RETURN result", 1)
|
||||||
|
assert has_n_result_row(cursor, "MATCH (n) WITH collect(n) AS nd UNWIND nd AS result RETURN result", 4)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(pytest.main([__file__, "-rA"]))
|
@ -11,3 +11,23 @@ workloads:
|
|||||||
binary: "tests/e2e/pytest_runner.sh"
|
binary: "tests/e2e/pytest_runner.sh"
|
||||||
args: ["distributed_queries/distributed_queries.py"]
|
args: ["distributed_queries/distributed_queries.py"]
|
||||||
<<: *template_cluster
|
<<: *template_cluster
|
||||||
|
|
||||||
|
- name: "Distributed unwind collect"
|
||||||
|
binary: "tests/e2e/pytest_runner.sh"
|
||||||
|
args: ["distributed_queries/unwind_collect.py"]
|
||||||
|
<<: *template_cluster
|
||||||
|
|
||||||
|
- name: "Distributed order by and limit"
|
||||||
|
binary: "tests/e2e/pytest_runner.sh"
|
||||||
|
args: ["distributed_queries/order_by_and_limit.py"]
|
||||||
|
<<: *template_cluster
|
||||||
|
|
||||||
|
- name: "Distributed distinct"
|
||||||
|
binary: "tests/e2e/pytest_runner.sh"
|
||||||
|
args: ["distributed_queries/distinct.py"]
|
||||||
|
<<: *template_cluster
|
||||||
|
|
||||||
|
- name: "Distributed optional match"
|
||||||
|
binary: "tests/e2e/pytest_runner.sh"
|
||||||
|
args: ["distributed_queries/optional_match.py"]
|
||||||
|
<<: *template_cluster
|
||||||
|
@ -12,7 +12,7 @@ PIP_DEPS=(
|
|||||||
"neo4j-driver==4.1.1"
|
"neo4j-driver==4.1.1"
|
||||||
"parse==1.18.0"
|
"parse==1.18.0"
|
||||||
"parse-type==0.5.2"
|
"parse-type==0.5.2"
|
||||||
"pytest==6.2.3"
|
"pytest==6.2.5"
|
||||||
"pyyaml==5.4.1"
|
"pyyaml==5.4.1"
|
||||||
"six==1.15.0"
|
"six==1.15.0"
|
||||||
)
|
)
|
||||||
@ -36,12 +36,12 @@ PYTHON_MINOR=$(python3 -c 'import sys; print(sys.version_info[:][1])')
|
|||||||
# NOTE (2021-11-15): PyPi doesn't contain pulsar-client for Python 3.9 so we have to use
|
# NOTE (2021-11-15): PyPi doesn't contain pulsar-client for Python 3.9 so we have to use
|
||||||
# our manually built wheel file. When they update the repository, pulsar-client can be
|
# our manually built wheel file. When they update the repository, pulsar-client can be
|
||||||
# added as a regular PIP dependancy
|
# added as a regular PIP dependancy
|
||||||
if [ $PYTHON_MINOR -lt 9 ]; then
|
#if [ $PYTHON_MINOR -lt 9 ]; then
|
||||||
pip --timeout 1000 install "pulsar-client==2.8.1"
|
# pip --timeout 1000 install "pulsar-client==2.8.1"
|
||||||
else
|
#else
|
||||||
pip --timeout 1000 install https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pulsar_client-2.8.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl
|
# pip --timeout 1000 install https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pulsar_client-2.8.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl
|
||||||
fi
|
#fi
|
||||||
|
#
|
||||||
for pkg in "${PIP_DEPS[@]}"; do
|
for pkg in "${PIP_DEPS[@]}"; do
|
||||||
pip --timeout 1000 install "$pkg"
|
pip --timeout 1000 install "$pkg"
|
||||||
done
|
done
|
||||||
|
Loading…
Reference in New Issue
Block a user