Merge branch 'project-pineapples' into T1079-MG-add-simple-query-to-benchmark_v2

This commit is contained in:
Jeremy B 2022-10-31 12:57:41 +01:00 committed by GitHub
commit 36a1c43851
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 602 additions and 260 deletions

View File

@ -219,3 +219,12 @@ jobs:
# Run simulation tests.
cd build
ctest -R memgraph__simulation --output-on-failure -j$THREADS
- name: Run e2e tests
run: |
# TODO(gitbuda): Setup mgclient and pymgclient properly.
cd tests
./setup.sh
source ve3/bin/activate
cd e2e
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../libs/mgclient/lib python runner.py --workloads-root-directory ./distributed_queries

View File

@ -9,6 +9,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <optional>
#include <unordered_map>
#include <vector>
@ -365,7 +366,6 @@ std::optional<LabelId> ShardMap::GetLabelId(const std::string &label) const {
if (const auto it = labels.find(label); it != labels.end()) {
return it->second;
}
return std::nullopt;
}
@ -382,7 +382,6 @@ std::optional<PropertyId> ShardMap::GetPropertyId(const std::string &property_na
if (const auto it = properties.find(property_name); it != properties.end()) {
return it->second;
}
return std::nullopt;
}
@ -399,7 +398,6 @@ std::optional<EdgeTypeId> ShardMap::GetEdgeTypeId(const std::string &edge_type)
if (const auto it = edge_types.find(edge_type); it != edge_types.end()) {
return it->second;
}
return std::nullopt;
}
@ -411,6 +409,7 @@ const std::string &ShardMap::GetEdgeTypeName(const EdgeTypeId property) const {
}
throw utils::BasicException("EdgeTypeId not found!");
}
Shards ShardMap::GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key,
const PrimaryKey &end_key) const {
MG_ASSERT(start_key <= end_key);

View File

@ -25,6 +25,7 @@
#include "io/address.hpp"
#include "storage/v3/config.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/name_id_mapper.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/schemas.hpp"
#include "storage/v3/temporal.hpp"
@ -120,9 +121,9 @@ struct ShardMap {
std::map<PropertyName, PropertyId> properties;
std::map<EdgeTypeName, EdgeTypeId> edge_types;
uint64_t max_label_id{kNotExistingId};
std::map<LabelName, LabelId> labels;
std::map<LabelId, LabelSpace> label_spaces;
std::map<LabelId, std::vector<SchemaProperty>> schemas;
std::map<LabelName, LabelId> labels;
[[nodiscard]] static ShardMap Parse(std::istream &input_stream);
friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map);

View File

@ -13,7 +13,7 @@
#ifndef MG_AST_INCLUDE_PATH
#ifdef MG_CLANG_TIDY_CHECK
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define MG_AST_INCLUDE_PATH "query/v2/frontend/ast/ast.hpp"
#include "query/v2/bindings/bindings.hpp"
#else
#error Missing AST include path
#endif
@ -21,8 +21,6 @@
#ifndef MG_INJECTED_NAMESPACE_NAME
#ifdef MG_CLANG_TIDY_CHECK
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define MG_INJECTED_NAMESPACE_NAME memgraph::query::v2
#else
#error Missing AST namespace
#endif

View File

@ -718,7 +718,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
TReturnType GetProperty(const TRecordAccessor &record_accessor, PropertyIx prop) {
auto maybe_prop = record_accessor.GetProperty(prop.name);
// Handler non existent property
return conv_(maybe_prop);
return conv_(maybe_prop, dba_);
}
template <class TRecordAccessor, class TTag = Tag,
@ -726,7 +726,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
TReturnType GetProperty(const TRecordAccessor &record_accessor, const std::string_view name) {
auto maybe_prop = record_accessor.GetProperty(std::string(name));
// Handler non existent property
return conv_(maybe_prop);
return conv_(maybe_prop, dba_);
}
template <class TRecordAccessor, class TTag = Tag,

View File

@ -18,6 +18,7 @@
#include "coordinator/shard_map.hpp"
#include "query/v2/accessors.hpp"
#include "query/v2/requests.hpp"
#include "query/v2/shard_request_manager.hpp"
#include "storage/v3/edge_accessor.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/result.hpp"
@ -70,51 +71,52 @@ query::v2::TypedValue ToTypedValue(const Value &value) {
}
}
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(const query::v2::accessors::VertexAccessor &vertex,
const coordinator::ShardMap &shard_map,
storage::v3::View /*view*/) {
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(
const query::v2::accessors::VertexAccessor &vertex, const msgs::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View /*view*/) {
auto id = communication::bolt::Id::FromUint(0);
auto labels = vertex.Labels();
std::vector<std::string> new_labels;
new_labels.reserve(labels.size());
for (const auto &label : labels) {
new_labels.push_back(shard_map.GetLabelName(label.id));
new_labels.push_back(shard_request_manager->LabelToName(label.id));
}
auto properties = vertex.Properties();
std::map<std::string, Value> new_properties;
for (const auto &[prop, property_value] : properties) {
new_properties[shard_map.GetPropertyName(prop)] = ToBoltValue(property_value);
new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value);
}
return communication::bolt::Vertex{id, new_labels, new_properties};
}
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(const query::v2::accessors::EdgeAccessor &edge,
const coordinator::ShardMap &shard_map,
storage::v3::View /*view*/) {
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(
const query::v2::accessors::EdgeAccessor &edge, const msgs::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View /*view*/) {
// TODO(jbajic) Fix bolt communication
auto id = communication::bolt::Id::FromUint(0);
auto from = communication::bolt::Id::FromUint(0);
auto to = communication::bolt::Id::FromUint(0);
const auto &type = shard_map.GetEdgeTypeName(edge.EdgeType());
const auto &type = shard_request_manager->EdgeTypeToName(edge.EdgeType());
auto properties = edge.Properties();
std::map<std::string, Value> new_properties;
for (const auto &[prop, property_value] : properties) {
new_properties[shard_map.GetPropertyName(prop)] = ToBoltValue(property_value);
new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value);
}
return communication::bolt::Edge{id, from, to, type, new_properties};
}
storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::accessors::Path & /*edge*/,
const coordinator::ShardMap & /*shard_map*/,
storage::v3::View /*view*/) {
storage::v3::Result<communication::bolt::Path> ToBoltPath(
const query::v2::accessors::Path & /*edge*/, const msgs::ShardRequestManagerInterface * /*shard_request_manager*/,
storage::v3::View /*view*/) {
// TODO(jbajic) Fix bolt communication
return {storage::v3::Error::DELETED_OBJECT};
}
storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const coordinator::ShardMap &shard_map,
storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value,
const msgs::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View view) {
switch (value.type()) {
case query::v2::TypedValue::Type::Null:
@ -131,7 +133,7 @@ storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const
std::vector<Value> values;
values.reserve(value.ValueList().size());
for (const auto &v : value.ValueList()) {
auto maybe_value = ToBoltValue(v, shard_map, view);
auto maybe_value = ToBoltValue(v, shard_request_manager, view);
if (maybe_value.HasError()) return maybe_value.GetError();
values.emplace_back(std::move(*maybe_value));
}
@ -140,24 +142,24 @@ storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const
case query::v2::TypedValue::Type::Map: {
std::map<std::string, Value> map;
for (const auto &kv : value.ValueMap()) {
auto maybe_value = ToBoltValue(kv.second, shard_map, view);
auto maybe_value = ToBoltValue(kv.second, shard_request_manager, view);
if (maybe_value.HasError()) return maybe_value.GetError();
map.emplace(kv.first, std::move(*maybe_value));
}
return Value(std::move(map));
}
case query::v2::TypedValue::Type::Vertex: {
auto maybe_vertex = ToBoltVertex(value.ValueVertex(), shard_map, view);
auto maybe_vertex = ToBoltVertex(value.ValueVertex(), shard_request_manager, view);
if (maybe_vertex.HasError()) return maybe_vertex.GetError();
return Value(std::move(*maybe_vertex));
}
case query::v2::TypedValue::Type::Edge: {
auto maybe_edge = ToBoltEdge(value.ValueEdge(), shard_map, view);
auto maybe_edge = ToBoltEdge(value.ValueEdge(), shard_request_manager, view);
if (maybe_edge.HasError()) return maybe_edge.GetError();
return Value(std::move(*maybe_edge));
}
case query::v2::TypedValue::Type::Path: {
auto maybe_path = ToBoltPath(value.ValuePath(), shard_map, view);
auto maybe_path = ToBoltPath(value.ValuePath(), shard_request_manager, view);
if (maybe_path.HasError()) return maybe_path.GetError();
return Value(std::move(*maybe_path));
}
@ -209,12 +211,6 @@ Value ToBoltValue(msgs::Value value) {
}
}
storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::accessors::Path & /*path*/,
const storage::v3::Shard & /*db*/,
storage::v3::View /*view*/) {
return communication::bolt::Path();
}
storage::v3::PropertyValue ToPropertyValue(const Value &value) {
switch (value.type()) {
case Value::Type::Null:

View File

@ -15,6 +15,7 @@
#include "communication/bolt/v1/value.hpp"
#include "coordinator/shard_map.hpp"
#include "query/v2/bindings/typed_value.hpp"
#include "query/v2/shard_request_manager.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/result.hpp"
#include "storage/v3/shard.hpp"
@ -30,40 +31,40 @@ namespace memgraph::glue::v2 {
/// @param storage::v3::VertexAccessor for converting to
/// communication::bolt::Vertex.
/// @param coordinator::ShardMap shard_map getting label and property names.
/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting label and property names.
/// @param storage::v3::View for deciding which vertex attributes are visible.
///
/// @throw std::bad_alloc
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(const storage::v3::VertexAccessor &vertex,
const coordinator::ShardMap &shard_map,
storage::v3::View view);
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(
const storage::v3::VertexAccessor &vertex, const msgs::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View view);
/// @param storage::v3::EdgeAccessor for converting to communication::bolt::Edge.
/// @param coordinator::ShardMap shard_map getting edge type and property names.
/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting edge type and property names.
/// @param storage::v3::View for deciding which edge attributes are visible.
///
/// @throw std::bad_alloc
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(const storage::v3::EdgeAccessor &edge,
const coordinator::ShardMap &shard_map,
storage::v3::View view);
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(
const storage::v3::EdgeAccessor &edge, const msgs::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View view);
/// @param query::v2::Path for converting to communication::bolt::Path.
/// @param coordinator::ShardMap shard_map ToBoltVertex and ToBoltEdge.
/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge.
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
///
/// @throw std::bad_alloc
storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::accessors::Path &path,
const coordinator::ShardMap &shard_map,
storage::v3::View view);
storage::v3::Result<communication::bolt::Path> ToBoltPath(
const query::v2::accessors::Path &path, const msgs::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View view);
/// @param query::v2::TypedValue for converting to communication::bolt::Value.
/// @param coordinator::ShardMap shard_map ToBoltVertex and ToBoltEdge.
/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge.
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
///
/// @throw std::bad_alloc
storage::v3::Result<communication::bolt::Value> ToBoltValue(const query::v2::TypedValue &value,
const coordinator::ShardMap &shard_map,
storage::v3::View view);
storage::v3::Result<communication::bolt::Value> ToBoltValue(
const query::v2::TypedValue &value, const msgs::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View view);
query::v2::TypedValue ToTypedValue(const communication::bolt::Value &value);
@ -73,7 +74,8 @@ storage::v3::PropertyValue ToPropertyValue(const communication::bolt::Value &val
communication::bolt::Value ToBoltValue(msgs::Value value);
communication::bolt::Value ToBoltValue(msgs::Value value, const coordinator::ShardMap &shard_map,
communication::bolt::Value ToBoltValue(msgs::Value value,
const msgs::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View view);
} // namespace memgraph::glue::v2

View File

@ -407,9 +407,8 @@ DEFINE_string(organization_name, "", "Organization name.");
struct SessionData {
// Explicit constructor here to ensure that pointers to all objects are
// supplied.
SessionData(memgraph::coordinator::ShardMap &shard_map, memgraph::query::v2::InterpreterContext *interpreter_context)
: shard_map(&shard_map), interpreter_context(interpreter_context) {}
memgraph::coordinator::ShardMap *shard_map;
explicit SessionData(memgraph::query::v2::InterpreterContext *interpreter_context)
: interpreter_context(interpreter_context) {}
memgraph::query::v2::InterpreterContext *interpreter_context;
};
@ -424,7 +423,6 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
memgraph::communication::v2::OutputStream *output_stream)
: memgraph::communication::bolt::Session<memgraph::communication::v2::InputStream,
memgraph::communication::v2::OutputStream>(input_stream, output_stream),
shard_map_(data.shard_map),
interpreter_(data.interpreter_context),
endpoint_(endpoint) {}
@ -455,7 +453,7 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
std::map<std::string, memgraph::communication::bolt::Value> Pull(TEncoder *encoder, std::optional<int> n,
std::optional<int> qid) override {
TypedValueResultStream stream(encoder, *shard_map_);
TypedValueResultStream stream(encoder, interpreter_.GetShardRequestManager());
return PullResults(stream, n, qid);
}
@ -482,7 +480,8 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
const auto &summary = interpreter_.Pull(&stream, n, qid);
std::map<std::string, memgraph::communication::bolt::Value> decoded_summary;
for (const auto &kv : summary) {
auto maybe_value = memgraph::glue::v2::ToBoltValue(kv.second, *shard_map_, memgraph::storage::v3::View::NEW);
auto maybe_value = memgraph::glue::v2::ToBoltValue(kv.second, interpreter_.GetShardRequestManager(),
memgraph::storage::v3::View::NEW);
if (maybe_value.HasError()) {
switch (maybe_value.GetError()) {
case memgraph::storage::v3::Error::DELETED_OBJECT:
@ -507,14 +506,14 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
/// before forwarding the calls to original TEncoder.
class TypedValueResultStream {
public:
TypedValueResultStream(TEncoder *encoder, const memgraph::coordinator::ShardMap &shard_map)
: encoder_(encoder), shard_map_(&shard_map) {}
TypedValueResultStream(TEncoder *encoder, const memgraph::msgs::ShardRequestManagerInterface *shard_request_manager)
: encoder_(encoder), shard_request_manager_(shard_request_manager) {}
void Result(const std::vector<memgraph::query::v2::TypedValue> &values) {
std::vector<memgraph::communication::bolt::Value> decoded_values;
decoded_values.reserve(values.size());
for (const auto &v : values) {
auto maybe_value = memgraph::glue::v2::ToBoltValue(v, *shard_map_, memgraph::storage::v3::View::NEW);
auto maybe_value = memgraph::glue::v2::ToBoltValue(v, shard_request_manager_, memgraph::storage::v3::View::NEW);
if (maybe_value.HasError()) {
switch (maybe_value.GetError()) {
case memgraph::storage::v3::Error::DELETED_OBJECT:
@ -534,12 +533,8 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
private:
TEncoder *encoder_;
// NOTE: Needed only for ToBoltValue conversions
const memgraph::coordinator::ShardMap *shard_map_;
const memgraph::msgs::ShardRequestManagerInterface *shard_request_manager_{nullptr};
};
// NOTE: Needed only for ToBoltValue conversions
const memgraph::coordinator::ShardMap *shard_map_;
memgraph::query::v2::Interpreter interpreter_;
memgraph::communication::v2::ServerEndpoint endpoint_;
};
@ -680,7 +675,7 @@ int main(int argc, char **argv) {
std::move(io),
mm.CoordinatorAddress()};
SessionData session_data{sm, &interpreter_context};
SessionData session_data{&interpreter_context};
interpreter_context.auth = nullptr;
interpreter_context.auth_checker = nullptr;

View File

@ -11,38 +11,59 @@
#include "query/v2/accessors.hpp"
#include "query/v2/requests.hpp"
#include "query/v2/shard_request_manager.hpp"
#include "storage/v3/id_types.hpp"
namespace memgraph::query::v2::accessors {
EdgeAccessor::EdgeAccessor(Edge edge) : edge(std::move(edge)) {}
EdgeAccessor::EdgeAccessor(Edge edge, const msgs::ShardRequestManagerInterface *manager)
: edge(std::move(edge)), manager_(manager) {}
EdgeTypeId EdgeAccessor::EdgeType() const { return edge.type.id; }
const std::vector<std::pair<PropertyId, Value>> &EdgeAccessor::Properties() const {
return edge.properties;
// std::map<std::string, TypedValue> res;
// for (const auto &[name, value] : *properties) {
// res[name] = ValueToTypedValue(value);
// }
// return res;
}
const std::vector<std::pair<PropertyId, Value>> &EdgeAccessor::Properties() const { return edge.properties; }
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
Value EdgeAccessor::GetProperty(const std::string & /*prop_name*/) const {
// TODO(kostasrim) fix this
return {};
Value EdgeAccessor::GetProperty(const std::string &prop_name) const {
auto prop_id = manager_->NameToProperty(prop_name);
auto it = std::find_if(edge.properties.begin(), edge.properties.end(), [&](auto &pr) { return prop_id == pr.first; });
if (it == edge.properties.end()) {
return {};
}
return it->second;
}
const Edge &EdgeAccessor::GetEdge() const { return edge; }
bool EdgeAccessor::IsCycle() const { return edge.src == edge.dst; };
VertexAccessor EdgeAccessor::To() const { return VertexAccessor(Vertex{edge.dst}, {}); }
VertexAccessor EdgeAccessor::To() const {
return VertexAccessor(Vertex{edge.dst}, std::vector<std::pair<PropertyId, msgs::Value>>{}, manager_);
}
VertexAccessor EdgeAccessor::From() const { return VertexAccessor(Vertex{edge.src}, {}); }
VertexAccessor EdgeAccessor::From() const {
return VertexAccessor(Vertex{edge.src}, std::vector<std::pair<PropertyId, msgs::Value>>{}, manager_);
}
VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props)
: vertex(std::move(v)), properties(std::move(props)) {}
VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props,
const msgs::ShardRequestManagerInterface *manager)
: vertex(std::move(v)), properties(std::move(props)), manager_(manager) {}
VertexAccessor::VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props,
const msgs::ShardRequestManagerInterface *manager)
: vertex(std::move(v)), manager_(manager) {
properties.reserve(props.size());
for (auto &[id, value] : props) {
properties.emplace_back(std::make_pair(id, std::move(value)));
}
}
VertexAccessor::VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props,
const msgs::ShardRequestManagerInterface *manager)
: vertex(std::move(v)), manager_(manager) {
properties.reserve(props.size());
for (const auto &[id, value] : props) {
properties.emplace_back(std::make_pair(id, value));
}
}
Label VertexAccessor::PrimaryLabel() const { return vertex.id.first; }
@ -58,15 +79,16 @@ bool VertexAccessor::HasLabel(Label &label) const {
const std::vector<std::pair<PropertyId, Value>> &VertexAccessor::Properties() const { return properties; }
Value VertexAccessor::GetProperty(PropertyId prop_id) const {
return std::find_if(properties.begin(), properties.end(), [&](auto &pr) { return prop_id == pr.first; })->second;
// return ValueToTypedValue(properties[prop_name]);
auto it = std::find_if(properties.begin(), properties.end(), [&](auto &pr) { return prop_id == pr.first; });
if (it == properties.end()) {
return {};
}
return it->second;
}
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
Value VertexAccessor::GetProperty(const std::string & /*prop_name*/) const {
// TODO(kostasrim) Add string mapping
return {};
// return ValueToTypedValue(properties[prop_name]);
Value VertexAccessor::GetProperty(const std::string &prop_name) const {
return GetProperty(manager_->NameToProperty(prop_name));
}
msgs::Vertex VertexAccessor::GetVertex() const { return vertex; }

View File

@ -11,6 +11,7 @@
#pragma once
#include <map>
#include <optional>
#include <utility>
#include <vector>
@ -23,6 +24,10 @@
#include "utils/memory.hpp"
#include "utils/memory_tracker.hpp"
namespace memgraph::msgs {
class ShardRequestManagerInterface;
} // namespace memgraph::msgs
namespace memgraph::query::v2::accessors {
using Value = memgraph::msgs::Value;
@ -36,7 +41,7 @@ class VertexAccessor;
class EdgeAccessor final {
public:
explicit EdgeAccessor(Edge edge);
explicit EdgeAccessor(Edge edge, const msgs::ShardRequestManagerInterface *manager);
[[nodiscard]] EdgeTypeId EdgeType() const;
@ -64,6 +69,7 @@ class EdgeAccessor final {
private:
Edge edge;
const msgs::ShardRequestManagerInterface *manager_;
};
class VertexAccessor final {
@ -71,7 +77,11 @@ class VertexAccessor final {
using PropertyId = msgs::PropertyId;
using Label = msgs::Label;
using VertexId = msgs::VertexId;
VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props);
VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props,
const msgs::ShardRequestManagerInterface *manager);
VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props, const msgs::ShardRequestManagerInterface *manager);
VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props, const msgs::ShardRequestManagerInterface *manager);
[[nodiscard]] Label PrimaryLabel() const;
@ -140,6 +150,7 @@ class VertexAccessor final {
private:
Vertex vertex;
std::vector<std::pair<PropertyId, Value>> properties;
const msgs::ShardRequestManagerInterface *manager_;
};
// inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); }

View File

@ -23,6 +23,10 @@
#include "storage/v3/property_value.hpp"
#include "storage/v3/view.hpp"
namespace memgraph::msgs {
class ShardRequestManagerInterface;
} // namespace memgraph::msgs
namespace memgraph::query::v2 {
inline const auto lam = [](const auto &val) { return ValueToTypedValue(val); };
@ -32,13 +36,14 @@ class Callable {
auto operator()(const memgraph::storage::v3::PropertyValue &val) const {
return memgraph::storage::v3::PropertyToTypedValue<TypedValue>(val);
};
auto operator()(const msgs::Value &val) const { return ValueToTypedValue(val); };
auto operator()(const msgs::Value &val, memgraph::msgs::ShardRequestManagerInterface *manager) const {
return ValueToTypedValue(val, manager);
};
};
} // namespace detail
using ExpressionEvaluator =
memgraph::expr::ExpressionEvaluator<TypedValue, memgraph::query::v2::EvaluationContext, DbAccessor,
storage::v3::View, storage::v3::LabelId, msgs::Value, detail::Callable,
memgraph::storage::v3::Error, memgraph::expr::QueryEngineTag>;
using ExpressionEvaluator = memgraph::expr::ExpressionEvaluator<
TypedValue, memgraph::query::v2::EvaluationContext, memgraph::msgs::ShardRequestManagerInterface, storage::v3::View,
storage::v3::LabelId, msgs::Value, detail::Callable, memgraph::storage::v3::Error, memgraph::expr::QueryEngineTag>;
} // namespace memgraph::query::v2

View File

@ -13,10 +13,11 @@
#include "bindings/typed_value.hpp"
#include "query/v2/accessors.hpp"
#include "query/v2/requests.hpp"
#include "query/v2/shard_request_manager.hpp"
namespace memgraph::query::v2 {
inline TypedValue ValueToTypedValue(const msgs::Value &value) {
inline TypedValue ValueToTypedValue(const msgs::Value &value, msgs::ShardRequestManagerInterface *manager) {
using Value = msgs::Value;
switch (value.type) {
case Value::Type::Null:
@ -34,7 +35,7 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value) {
std::vector<TypedValue> dst;
dst.reserve(lst.size());
for (const auto &elem : lst) {
dst.push_back(ValueToTypedValue(elem));
dst.push_back(ValueToTypedValue(elem, manager));
}
return TypedValue(std::move(dst));
}
@ -42,14 +43,15 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value) {
const auto &value_map = value.map_v;
std::map<std::string, TypedValue> dst;
for (const auto &[key, val] : value_map) {
dst[key] = ValueToTypedValue(val);
dst[key] = ValueToTypedValue(val, manager);
}
return TypedValue(std::move(dst));
}
case Value::Type::Vertex:
return TypedValue(accessors::VertexAccessor(value.vertex_v, {}));
return TypedValue(accessors::VertexAccessor(
value.vertex_v, std::vector<std::pair<storage::v3::PropertyId, msgs::Value>>{}, manager));
case Value::Type::Edge:
return TypedValue(accessors::EdgeAccessor(value.edge_v));
return TypedValue(accessors::EdgeAccessor(value.edge_v, manager));
}
throw std::runtime_error("Incorrect type in conversion");
}

View File

@ -22,8 +22,8 @@
#include "query/v2/bindings/typed_value.hpp"
#include "query/v2/conversions.hpp"
#include "query/v2/db_accessor.hpp"
#include "query/v2/exceptions.hpp"
#include "query/v2/shard_request_manager.hpp"
#include "storage/v3/conversions.hpp"
#include "utils/string.hpp"
#include "utils/temporal.hpp"

View File

@ -16,12 +16,15 @@
#include <unordered_map>
#include "query/v2/bindings/typed_value.hpp"
#include "query/v2/db_accessor.hpp"
#include "storage/v3/view.hpp"
#include "utils/memory.hpp"
namespace memgraph::query::v2 {
namespace memgraph::msgs {
class ShardRequestManagerInterface;
} // namespace memgraph::msgs
class DbAccessor;
namespace memgraph::query::v2 {
namespace {
const char kStartsWith[] = "STARTSWITH";
@ -31,7 +34,9 @@ const char kId[] = "ID";
} // namespace
struct FunctionContext {
DbAccessor *db_accessor;
// TODO(kostasrim) consider optional here. ShardRequestManager does not exist on the storage.
// DbAccessor *db_accessor;
msgs::ShardRequestManagerInterface *manager;
utils::MemoryResource *memory;
int64_t timestamp;
std::unordered_map<std::string, int64_t> *counters;

View File

@ -144,7 +144,7 @@ class ReplQueryHandler final : public query::v2::ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters &parameters,
DbAccessor *db_accessor) {
msgs::ShardRequestManagerInterface *manager) {
// Empty frame for evaluation of password expression. This is OK since
// password should be either null or string literal and it's evaluation
// should not depend on frame.
@ -155,7 +155,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa
// the argument to Callback.
evaluation_context.timestamp = QueryTimestamp();
evaluation_context.parameters = parameters;
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD);
std::string username = auth_query->user_;
std::string rolename = auth_query->role_;
@ -313,7 +313,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa
}
Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &parameters,
InterpreterContext *interpreter_context, DbAccessor *db_accessor,
InterpreterContext *interpreter_context, msgs::ShardRequestManagerInterface *manager,
std::vector<Notification> *notifications) {
expr::Frame<TypedValue> frame(0);
SymbolTable symbol_table;
@ -322,7 +322,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
// the argument to Callback.
evaluation_context.timestamp = QueryTimestamp();
evaluation_context.parameters = parameters;
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD);
Callback callback;
switch (repl_query->action_) {
@ -448,7 +448,8 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
}
}
Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters &parameters, DbAccessor *db_accessor) {
Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters &parameters,
msgs::ShardRequestManagerInterface *manager) {
expr::Frame<TypedValue> frame(0);
SymbolTable symbol_table;
EvaluationContext evaluation_context;
@ -458,7 +459,7 @@ Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters &param
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
evaluation_context.parameters = parameters;
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD);
Callback callback;
switch (setting_query->action_) {
@ -886,7 +887,8 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
EvaluationContext evaluation_context;
evaluation_context.timestamp = QueryTimestamp();
evaluation_context.parameters = parsed_query.parameters;
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::v3::View::OLD);
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, shard_request_manager,
storage::v3::View::OLD);
const auto memory_limit =
expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
if (memory_limit) {
@ -901,7 +903,6 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
"convert the parsed row values to the appropriate type. This can be done using the built-in "
"conversion functions such as ToInteger, ToFloat, ToBoolean etc.");
}
shard_request_manager->StartTransaction();
auto plan = CypherQueryToPlan(
parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query, parsed_query.parameters,
parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, shard_request_manager);
@ -957,10 +958,10 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map<std::string
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query);
MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in EXPLAIN");
auto cypher_query_plan =
CypherQueryToPlan(parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage),
cypher_query, parsed_inner_query.parameters,
parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, nullptr);
auto cypher_query_plan = CypherQueryToPlan(
parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), cypher_query,
parsed_inner_query.parameters, parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr,
shard_request_manager);
std::stringstream printed_plan;
plan::PrettyPrint(*shard_request_manager, &cypher_query_plan->plan(), &printed_plan);
@ -1030,7 +1031,8 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
EvaluationContext evaluation_context;
evaluation_context.timestamp = QueryTimestamp();
evaluation_context.parameters = parsed_inner_query.parameters;
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::v3::View::OLD);
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, shard_request_manager,
storage::v3::View::OLD);
const auto memory_limit =
expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
@ -1179,14 +1181,15 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
DbAccessor *dba, utils::MemoryResource *execution_memory) {
DbAccessor *dba, utils::MemoryResource *execution_memory,
msgs::ShardRequestManagerInterface *manager) {
if (in_explicit_transaction) {
throw UserModificationInMulticommandTxException();
}
auto *auth_query = utils::Downcast<AuthQuery>(parsed_query.query);
auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, dba);
auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, manager);
SymbolTable symbol_table;
std::vector<Symbol> output_symbols;
@ -1215,14 +1218,14 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa
PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
DbAccessor *dba) {
msgs::ShardRequestManagerInterface *manager) {
if (in_explicit_transaction) {
throw ReplicationModificationInMulticommandTxException();
}
auto *replication_query = utils::Downcast<ReplicationQuery>(parsed_query.query);
auto callback =
HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, dba, notifications);
HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, manager, notifications);
return PreparedQuery{callback.header, std::move(parsed_query.required_privileges),
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
@ -1310,14 +1313,15 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli
throw SemanticException("CreateSnapshot query is not supported!");
}
PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, DbAccessor *dba) {
PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
msgs::ShardRequestManagerInterface *manager) {
if (in_explicit_transaction) {
throw SettingConfigInMulticommandTxException{};
}
auto *setting_query = utils::Downcast<SettingQuery>(parsed_query.query);
MG_ASSERT(setting_query);
auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, dba);
auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, manager);
return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges),
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
@ -1511,6 +1515,11 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
ParsedQuery parsed_query =
ParseQuery(query_string, params, &interpreter_context_->ast_cache, interpreter_context_->config.query);
query_execution->summary["parsing_time"] = parsing_timer.Elapsed().count();
if (!in_explicit_transaction_ &&
(utils::Downcast<CypherQuery>(parsed_query.query) || utils::Downcast<ExplainQuery>(parsed_query.query) ||
utils::Downcast<ProfileQuery>(parsed_query.query))) {
shard_request_manager_->StartTransaction();
}
utils::Timer planning_timer;
PreparedQuery prepared_query;
@ -1533,9 +1542,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_,
&query_execution->notifications, interpreter_context_);
} else if (utils::Downcast<AuthQuery>(parsed_query.query)) {
prepared_query = PrepareAuthQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
interpreter_context_, &*execution_db_accessor_,
&query_execution->execution_memory_with_exception);
prepared_query = PrepareAuthQuery(
std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_,
&*execution_db_accessor_, &query_execution->execution_memory_with_exception, shard_request_manager_.get());
} else if (utils::Downcast<InfoQuery>(parsed_query.query)) {
prepared_query = PrepareInfoQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
interpreter_context_, interpreter_context_->db,
@ -1546,7 +1555,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
} else if (utils::Downcast<ReplicationQuery>(parsed_query.query)) {
prepared_query =
PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications,
interpreter_context_, &*execution_db_accessor_);
interpreter_context_, shard_request_manager_.get());
} else if (utils::Downcast<LockPathQuery>(parsed_query.query)) {
prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_,
&*execution_db_accessor_);
@ -1563,7 +1572,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
prepared_query =
PrepareCreateSnapshotQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_);
} else if (utils::Downcast<SettingQuery>(parsed_query.query)) {
prepared_query = PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, &*execution_db_accessor_);
prepared_query =
PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, shard_request_manager_.get());
} else if (utils::Downcast<VersionQuery>(parsed_query.query)) {
prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_);
} else if (utils::Downcast<SchemaQuery>(parsed_query.query)) {

View File

@ -296,6 +296,8 @@ class Interpreter final {
*/
void Abort();
const msgs::ShardRequestManagerInterface *GetShardRequestManager() const { return shard_request_manager_.get(); }
private:
struct QueryExecution {
std::optional<PreparedQuery> prepared_query;

View File

@ -169,6 +169,7 @@ class DistributedCreateNodeCursor : public Cursor {
if (input_cursor_->Pull(frame, context)) {
auto &shard_manager = context.shard_request_manager;
shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame));
PlaceNodeOnTheFrame(frame, context);
return true;
}
@ -179,8 +180,19 @@ class DistributedCreateNodeCursor : public Cursor {
void Reset() override { state_ = {}; }
std::vector<msgs::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) const {
void PlaceNodeOnTheFrame(Frame &frame, ExecutionContext &context) {
// TODO(kostasrim) Make this work with batching
const auto primary_label = msgs::Label{.id = nodes_info_[0]->labels[0]};
msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[0])};
frame[nodes_info_.front()->symbol] = TypedValue(
query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[0], context.shard_request_manager));
}
std::vector<msgs::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) {
std::vector<msgs::NewVertex> requests;
// TODO(kostasrim) this assertion should be removed once we support multiple vertex creation
MG_ASSERT(nodes_info_.size() == 1);
msgs::PrimaryKey pk;
for (const auto &node_info : nodes_info_) {
msgs::NewVertex rqst;
MG_ASSERT(!node_info->labels.empty(), "Cannot determine primary label");
@ -188,17 +200,14 @@ class DistributedCreateNodeCursor : public Cursor {
// TODO(jbajic) Fix properties not send,
// suggestion: ignore distinction between properties and primary keys
// since schema validation is done on storage side
std::map<msgs::PropertyId, msgs::Value> properties;
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr,
storage::v3::View::NEW);
if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info->properties)) {
for (const auto &[key, value_expression] : *node_info_properties) {
TypedValue val = value_expression->Accept(evaluator);
if (context.shard_request_manager->IsPrimaryKey(primary_label, key)) {
rqst.primary_key.push_back(TypedValueToValue(val));
} else {
properties[key] = TypedValueToValue(val);
pk.push_back(TypedValueToValue(val));
}
}
} else {
@ -207,9 +216,8 @@ class DistributedCreateNodeCursor : public Cursor {
auto key_str = std::string(key);
auto property_id = context.shard_request_manager->NameToProperty(key_str);
if (context.shard_request_manager->IsPrimaryKey(primary_label, property_id)) {
rqst.primary_key.push_back(storage::v3::TypedValueToValue(value));
} else {
properties[property_id] = TypedValueToValue(value);
rqst.primary_key.push_back(TypedValueToValue(value));
pk.push_back(TypedValueToValue(value));
}
}
}
@ -219,14 +227,18 @@ class DistributedCreateNodeCursor : public Cursor {
}
// TODO(kostasrim) Copy non primary labels as well
rqst.label_ids.push_back(msgs::Label{.id = primary_label});
src_vertex_props_.push_back(rqst.properties);
requests.push_back(std::move(rqst));
}
primary_keys_.push_back(std::move(pk));
return requests;
}
private:
const UniqueCursorPtr input_cursor_;
std::vector<const NodeCreationInfo *> nodes_info_;
std::vector<std::vector<std::pair<storage::v3::PropertyId, msgs::Value>>> src_vertex_props_;
std::vector<msgs::PrimaryKey> primary_keys_;
msgs::ExecutionState<msgs::CreateVerticesRequest> state_;
};
@ -687,7 +699,7 @@ bool Filter::FilterCursor::Pull(Frame &frame, ExecutionContext &context) {
// Like all filters, newly set values should not affect filtering of old
// nodes and edges.
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.shard_request_manager,
storage::v3::View::OLD);
while (input_cursor_->Pull(frame, context)) {
if (EvaluateFilter(evaluator, self_.expression_)) return true;
@ -728,8 +740,8 @@ bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) {
if (input_cursor_->Pull(frame, context)) {
// Produce should always yield the latest results.
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
storage::v3::View::NEW);
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
context.shard_request_manager, storage::v3::View::NEW);
for (auto named_expr : self_.named_expressions_) named_expr->Accept(evaluator);
return true;
@ -1149,8 +1161,8 @@ class AggregateCursor : public Cursor {
* aggregation results, and not on the number of inputs.
*/
void ProcessAll(Frame *frame, ExecutionContext *context) {
ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context, context->db_accessor,
storage::v3::View::NEW);
ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context,
context->shard_request_manager, storage::v3::View::NEW);
while (input_cursor_->Pull(*frame, *context)) {
ProcessOne(*frame, &evaluator);
}
@ -1370,8 +1382,8 @@ bool Skip::SkipCursor::Pull(Frame &frame, ExecutionContext &context) {
// First successful pull from the input, evaluate the skip expression.
// The skip expression doesn't contain identifiers so graph view
// parameter is not important.
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
storage::v3::View::OLD);
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
context.shard_request_manager, storage::v3::View::OLD);
TypedValue to_skip = self_.expression_->Accept(evaluator);
if (to_skip.type() != TypedValue::Type::Int)
throw QueryRuntimeException("Number of elements to skip must be an integer.");
@ -1425,8 +1437,8 @@ bool Limit::LimitCursor::Pull(Frame &frame, ExecutionContext &context) {
if (limit_ == -1) {
// Limit expression doesn't contain identifiers so graph view is not
// important.
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
storage::v3::View::OLD);
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
context.shard_request_manager, storage::v3::View::OLD);
TypedValue limit = self_.expression_->Accept(evaluator);
if (limit.type() != TypedValue::Type::Int)
throw QueryRuntimeException("Limit on number of returned elements must be an integer.");
@ -1481,8 +1493,8 @@ class OrderByCursor : public Cursor {
SCOPED_PROFILE_OP("OrderBy");
if (!did_pull_all_) {
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
storage::v3::View::OLD);
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
context.shard_request_manager, storage::v3::View::OLD);
auto *mem = cache_.get_allocator().GetMemoryResource();
while (input_cursor_->Pull(frame, context)) {
// collect the order_by elements
@ -1739,8 +1751,8 @@ class UnwindCursor : public Cursor {
if (!input_cursor_->Pull(frame, context)) return false;
// successful pull from input, initialize value and iterator
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
storage::v3::View::OLD);
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
context.shard_request_manager, storage::v3::View::OLD);
TypedValue input_value = self_.input_expression_->Accept(evaluator);
if (input_value.type() != TypedValue::Type::List)
throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type());
@ -2217,7 +2229,7 @@ class LoadCsvCursor : public Cursor {
// self_->delimiter_, and self_->quote_ earlier (say, in the interpreter.cpp)
// without massacring the code even worse than I did here
if (UNLIKELY(!reader_)) {
reader_ = MakeReader(&context.evaluation_context);
reader_ = MakeReader(context);
}
bool input_pulled = input_cursor_->Pull(frame, context);
@ -2246,11 +2258,12 @@ class LoadCsvCursor : public Cursor {
void Shutdown() override { input_cursor_->Shutdown(); }
private:
csv::Reader MakeReader(EvaluationContext *eval_context) {
csv::Reader MakeReader(ExecutionContext &context) {
auto &eval_context = context.evaluation_context;
Frame frame(0);
SymbolTable symbol_table;
DbAccessor *dba = nullptr;
auto evaluator = ExpressionEvaluator(&frame, symbol_table, *eval_context, dba, storage::v3::View::OLD);
auto evaluator =
ExpressionEvaluator(&frame, symbol_table, eval_context, context.shard_request_manager, storage::v3::View::OLD);
auto maybe_file = ToOptionalString(&evaluator, self_->file_);
auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_);
@ -2287,8 +2300,8 @@ class ForeachCursor : public Cursor {
return false;
}
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
storage::v3::View::NEW);
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
context.shard_request_manager, storage::v3::View::NEW);
TypedValue expr_result = expression->Accept(evaluator);
if (expr_result.IsNull()) {
@ -2458,15 +2471,51 @@ class DistributedExpandCursor : public Cursor {
: self_(self),
input_cursor_(self.input_->MakeCursor(mem)),
current_in_edge_it_(current_in_edges_.begin()),
current_out_edge_it_(current_out_edges_.begin()) {
if (self_.common_.existing_node) {
throw QueryRuntimeException("Cannot use existing node with DistributedExpandOne cursor!");
}
}
current_out_edge_it_(current_out_edges_.begin()) {}
using VertexAccessor = accessors::VertexAccessor;
using EdgeAccessor = accessors::EdgeAccessor;
static constexpr auto DirectionToMsgsDirection(const auto direction) {
switch (direction) {
case EdgeAtom::Direction::IN:
return msgs::EdgeDirection::IN;
case EdgeAtom::Direction::OUT:
return msgs::EdgeDirection::OUT;
case EdgeAtom::Direction::BOTH:
return msgs::EdgeDirection::BOTH;
}
};
void PullDstVertex(Frame &frame, ExecutionContext &context, const EdgeAtom::Direction direction) {
if (self_.common_.existing_node) {
return;
}
MG_ASSERT(direction != EdgeAtom::Direction::BOTH);
const auto &edge = frame[self_.common_.edge_symbol].ValueEdge();
static auto get_dst_vertex = [&edge](const EdgeAtom::Direction direction) {
switch (direction) {
case EdgeAtom::Direction::IN:
return edge.From().Id();
case EdgeAtom::Direction::OUT:
return edge.To().Id();
case EdgeAtom::Direction::BOTH:
throw std::runtime_error("EdgeDirection Both not implemented");
}
};
msgs::ExpandOneRequest request;
// to not fetch any properties of the edges
request.edge_properties.emplace();
request.src_vertices.push_back(get_dst_vertex(direction));
request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN;
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
MG_ASSERT(result_rows.size() == 1);
auto &result_row = result_rows.front();
frame[self_.common_.node_symbol] = accessors::VertexAccessor(
msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.shard_request_manager);
}
bool InitEdges(Frame &frame, ExecutionContext &context) {
// Input Vertex could be null if it is created by a failed optional match. In
// those cases we skip that input pull and continue with the next.
@ -2480,44 +2529,41 @@ class DistributedExpandCursor : public Cursor {
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
auto &vertex = vertex_value.ValueVertex();
static constexpr auto direction_to_msgs_direction = [](const EdgeAtom::Direction direction) {
switch (direction) {
case EdgeAtom::Direction::IN:
return msgs::EdgeDirection::IN;
case EdgeAtom::Direction::OUT:
return msgs::EdgeDirection::OUT;
case EdgeAtom::Direction::BOTH:
return msgs::EdgeDirection::BOTH;
}
};
msgs::ExpandOneRequest request;
request.direction = direction_to_msgs_direction(self_.common_.direction);
request.direction = DirectionToMsgsDirection(self_.common_.direction);
// to not fetch any properties of the edges
request.edge_properties.emplace();
request.src_vertices.push_back(vertex.Id());
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
MG_ASSERT(result_rows.size() == 1);
auto &result_row = result_rows.front();
const auto convert_edges = [&vertex](
if (self_.common_.existing_node) {
const auto &node = frame[self_.common_.node_symbol].ValueVertex().Id();
auto &in = result_row.in_edges_with_specific_properties;
std::erase_if(in, [&node](auto &edge) { return edge.other_end != node; });
auto &out = result_row.out_edges_with_specific_properties;
std::erase_if(out, [&node](auto &edge) { return edge.other_end != node; });
}
const auto convert_edges = [&vertex, &context](
std::vector<msgs::ExpandOneResultRow::EdgeWithSpecificProperties> &&edge_messages,
const EdgeAtom::Direction direction) {
std::vector<EdgeAccessor> edge_accessors;
edge_accessors.reserve(edge_messages.size());
switch (direction) {
case EdgeAtom::Direction::IN: {
for (auto &edge : edge_messages) {
edge_accessors.emplace_back(
msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type});
edge_accessors.emplace_back(msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type},
context.shard_request_manager);
}
break;
}
case EdgeAtom::Direction::OUT: {
for (auto &edge : edge_messages) {
edge_accessors.emplace_back(
msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type});
edge_accessors.emplace_back(msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type},
context.shard_request_manager);
}
break;
}
@ -2527,12 +2573,13 @@ class DistributedExpandCursor : public Cursor {
}
return edge_accessors;
};
current_in_edges_ =
convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::IN);
current_in_edge_it_ = current_in_edges_.begin();
current_in_edges_ =
convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::OUT);
current_in_edge_it_ = current_in_edges_.begin();
current_out_edges_ =
convert_edges(std::move(result_row.out_edges_with_specific_properties), EdgeAtom::Direction::OUT);
current_out_edge_it_ = current_out_edges_.begin();
return true;
}
}
@ -2540,19 +2587,6 @@ class DistributedExpandCursor : public Cursor {
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("DistributedExpand");
// A helper function for expanding a node from an edge.
auto pull_node = [this, &frame](const EdgeAccessor &new_edge, EdgeAtom::Direction direction) {
if (self_.common_.existing_node) return;
switch (direction) {
case EdgeAtom::Direction::IN:
frame[self_.common_.node_symbol] = new_edge.From();
break;
case EdgeAtom::Direction::OUT:
frame[self_.common_.node_symbol] = new_edge.To();
break;
case EdgeAtom::Direction::BOTH:
LOG_FATAL("Must indicate exact expansion direction here");
}
};
while (true) {
if (MustAbort(context)) throw HintedAbortError();
@ -2561,7 +2595,7 @@ class DistributedExpandCursor : public Cursor {
auto &edge = *current_in_edge_it_;
++current_in_edge_it_;
frame[self_.common_.edge_symbol] = edge;
pull_node(edge, EdgeAtom::Direction::IN);
PullDstVertex(frame, context, EdgeAtom::Direction::IN);
return true;
}
@ -2573,7 +2607,7 @@ class DistributedExpandCursor : public Cursor {
continue;
};
frame[self_.common_.edge_symbol] = edge;
pull_node(edge, EdgeAtom::Direction::OUT);
PullDstVertex(frame, context, EdgeAtom::Direction::OUT);
return true;
}

View File

@ -387,7 +387,6 @@ struct GetPropertiesResponse {
enum class EdgeDirection : uint8_t { OUT = 1, IN = 2, BOTH = 3 };
struct ExpandOneRequest {
// TODO(antaljanosbenjamin): Filtering based on the id of the other end of the edge?
Hlc transaction_id;
std::vector<VertexId> src_vertices;
// return types that type is in this list

View File

@ -171,6 +171,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
if (hlc_response.fresher_shard_map) {
shards_map_ = hlc_response.fresher_shard_map.value();
SetUpNameIdMappers();
}
}
@ -186,6 +187,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
if (hlc_response.fresher_shard_map) {
shards_map_ = hlc_response.fresher_shard_map.value();
SetUpNameIdMappers();
}
auto commit_timestamp = hlc_response.new_hlc;
@ -223,14 +225,14 @@ class ShardRequestManager : public ShardRequestManagerInterface {
return shards_map_.GetLabelId(name).value();
}
const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const override {
return shards_map_.GetPropertyName(prop);
const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override {
return properties_.IdToName(id.AsUint());
}
const std::string &LabelToName(memgraph::storage::v3::LabelId label) const override {
return shards_map_.GetLabelName(label);
const std::string &LabelToName(memgraph::storage::v3::LabelId id) const override {
return labels_.IdToName(id.AsUint());
}
const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const override {
return shards_map_.GetEdgeTypeName(type);
const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId id) const override {
return edge_types_.IdToName(id.AsUint());
}
bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override {
@ -358,7 +360,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
std::vector<VertexAccessor> accessors;
for (auto &response : responses) {
for (auto &result_row : response.results) {
accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props)));
accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props), this));
}
}
return accessors;
@ -697,7 +699,28 @@ class ShardRequestManager : public ShardRequestManagerInterface {
}
}
void SetUpNameIdMappers() {
std::unordered_map<uint64_t, std::string> id_to_name;
for (const auto &[name, id] : shards_map_.labels) {
id_to_name.emplace(id.AsUint(), name);
}
labels_.StoreMapping(std::move(id_to_name));
id_to_name.clear();
for (const auto &[name, id] : shards_map_.properties) {
id_to_name.emplace(id.AsUint(), name);
}
properties_.StoreMapping(std::move(id_to_name));
id_to_name.clear();
for (const auto &[name, id] : shards_map_.edge_types) {
id_to_name.emplace(id.AsUint(), name);
}
edge_types_.StoreMapping(std::move(id_to_name));
}
ShardMap shards_map_;
storage::v3::NameIdMapper properties_;
storage::v3::NameIdMapper edge_types_;
storage::v3::NameIdMapper labels_;
CoordinatorClient coord_cli_;
RsmStorageClientManager<StorageClient> storage_cli_manager_;
memgraph::io::Io<TTransport> io_;

View File

@ -53,6 +53,10 @@ class NameIdMapper final {
return it->second;
}
const auto &GetIdToNameMap() const { return id_to_name_; }
const auto &GetNameToIdMap() const { return name_to_id_; }
private:
// Necessary for comparison with string_view nad string
// https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0919r1.html

View File

@ -17,6 +17,7 @@
#include "parser/opencypher/parser.hpp"
#include "query/v2/requests.hpp"
#include "storage/v2/view.hpp"
#include "storage/v3/bindings/ast/ast.hpp"
#include "storage/v3/bindings/cypher_main_visitor.hpp"
#include "storage/v3/bindings/db_accessor.hpp"
@ -113,6 +114,24 @@ std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor
return ret;
}
std::optional<std::map<PropertyId, Value>> PrimaryKeysFromAccessor(const VertexAccessor &acc, View view,
const Schemas::Schema *schema) {
std::map<PropertyId, Value> ret;
auto props = acc.Properties(view);
auto maybe_pk = acc.PrimaryKey(view);
if (maybe_pk.HasError()) {
spdlog::debug("Encountered an error while trying to get vertex primary key.");
return std::nullopt;
}
auto &pk = maybe_pk.GetValue();
MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!");
for (size_t i{0}; i < schema->second.size(); ++i) {
ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i])));
}
return ret;
}
std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view,
const Schemas::Schema *schema) {
std::map<PropertyId, Value> ret;
@ -129,17 +148,9 @@ std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(cons
});
properties.clear();
// TODO(antaljanosbenjamin): Once the VertexAccessor::Properties returns also the primary keys, we can get rid of this
// code.
auto maybe_pk = acc.PrimaryKey(view);
if (maybe_pk.HasError()) {
spdlog::debug("Encountered an error while trying to get vertex primary key.");
}
auto &pk = maybe_pk.GetValue();
MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!");
for (size_t i{0}; i < schema->second.size(); ++i) {
ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i])));
auto pks = PrimaryKeysFromAccessor(acc, view, schema);
if (pks) {
ret.merge(*pks);
}
return ret;
@ -190,7 +201,9 @@ std::optional<msgs::Vertex> FillUpSourceVertex(const std::optional<VertexAccesso
}
std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc,
const msgs::ExpandOneRequest &req) {
const msgs::ExpandOneRequest &req,
storage::v3::View view,
const Schemas::Schema *schema) {
std::map<PropertyId, Value> src_vertex_properties;
if (!req.src_vertex_properties) {
@ -204,6 +217,10 @@ std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const st
for (auto &[key, val] : props.GetValue()) {
src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val))));
}
auto pks = PrimaryKeysFromAccessor(*v_acc, view, schema);
if (pks) {
src_vertex_properties.merge(*pks);
}
} else if (req.src_vertex_properties.value().empty()) {
// NOOP
@ -264,7 +281,6 @@ std::optional<std::array<std::vector<EdgeAccessor>, 2>> FillUpConnectingEdges(
return std::nullopt;
}
in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN);
auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types);
if (out_edges_result.HasError()) {
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
@ -303,7 +319,8 @@ bool FillEdges(const std::vector<EdgeAccessor> &edges, msgs::ExpandOneResultRow
std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req,
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler) {
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler,
const Schemas::Schema *schema) {
/// Fill up source vertex
const auto primary_key = ConvertPropertyVector(std::move(src_vertex.second));
auto v_acc = acc.FindVertex(primary_key, View::NEW);
@ -312,9 +329,9 @@ std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
if (!source_vertex) {
return std::nullopt;
}
std::optional<std::map<PropertyId, Value>> src_vertex_properties;
src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema);
/// Fill up source vertex properties
auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req);
if (!src_vertex_properties) {
return std::nullopt;
}
@ -852,7 +869,8 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
continue;
}
}
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler);
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler,
shard_->GetSchema(shard_->PrimaryLabel()));
if (!result) {
action_successful = false;

View File

@ -33,8 +33,9 @@ std::ostream &operator<<(std::ostream &in, const std::vector<T> &vector) {
return in;
}
template <typename K, typename V>
std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
namespace detail {
template <typename T>
std::ostream &MapImpl(std::ostream &in, const T &map) {
in << "{";
bool first = true;
for (const auto &[a, b] : map) {
@ -49,6 +50,17 @@ std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
in << "}";
return in;
}
} // namespace detail
template <typename K, typename V>
std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
return detail::MapImpl(in, map);
}
template <typename K, typename V, typename THash, typename Cmp>
std::ostream &operator<<(std::ostream &in, const std::unordered_map<K, V, THash, Cmp> &map) {
return detail::MapImpl(in, map);
}
template <typename K, typename V>
std::ostream &operator<<(std::ostream &in, const std::unordered_map<K, V> &map) {

View File

@ -3,3 +3,8 @@ function(distributed_queries_e2e_python_files FILE_NAME)
endfunction()
distributed_queries_e2e_python_files(distributed_queries.py)
distributed_queries_e2e_python_files(unwind_collect.py)
distributed_queries_e2e_python_files(order_by_and_limit.py)
distributed_queries_e2e_python_files(distinct.py)
distributed_queries_e2e_python_files(optional_match.py)
distributed_queries_e2e_python_files(common.py)

View File

@ -0,0 +1,44 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import typing
import mgclient
import sys
import pytest
import time
@pytest.fixture(autouse=True)
def connection():
connection = connect()
return connection
def connect(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
connection.autocommit = True
return connection
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
cursor.execute(query, params)
return cursor.fetchall()
def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int):
results = execute_and_fetch_all(cursor, query)
return len(results) == n
def wait_for_shard_manager_to_initialize():
# The ShardManager in memgraph takes some time to initialize
# the shards, thus we cannot just run the queries right away
time.sleep(3)

View File

@ -0,0 +1,38 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import typing
import mgclient
import sys
import pytest
import time
from common import *
def test_distinct(connection):
wait_for_shard_manager_to_initialize()
cursor = connection.cursor()
assert has_n_result_row(cursor, "CREATE (n :label {property:0})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0)
assert has_n_result_row(cursor, "MATCH (n), (m) CREATE (n)-[:TO]->(m)", 0)
assert has_n_result_row(cursor, "MATCH (n)-[r]->(m) RETURN r", 4)
results = execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN DISTINCT m")
assert len(results) == 2
for i, n in enumerate(results):
n_props = n[0].properties
assert len(n_props) == 1
assert n_props["property"] == i
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -14,34 +14,7 @@ import mgclient
import sys
import pytest
import time
@pytest.fixture(autouse=True)
def connection():
connection = connect()
return connection
def connect(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
connection.autocommit = True
return connection
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
cursor.execute(query, params)
return cursor.fetchall()
def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int):
results = execute_and_fetch_all(cursor, query)
return len(results) == n
def wait_for_shard_manager_to_initialize():
# The ShardManager in memgraph takes some time to initialize
# the shards, thus we cannot just run the queries right away
time.sleep(3)
from common import *
def test_vertex_creation_and_scanall(connection):
@ -62,7 +35,7 @@ def test_vertex_creation_and_scanall(connection):
assert len(results) == 9
for (n, r, m) in results:
n_props = n.properties
assert len(n_props) == 0, "n is not expected to have properties, update the test!"
assert len(n_props) == 1, "n is not expected to have properties, update the test!"
assert len(n.labels) == 0, "n is not expected to have labels, update the test!"
assert r.type == "TO"

View File

@ -0,0 +1,37 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import typing
import mgclient
import sys
import pytest
import time
from common import *
def test_optional_match(connection):
wait_for_shard_manager_to_initialize()
cursor = connection.cursor()
assert has_n_result_row(cursor, "CREATE (n :label {property:0})", 0)
results = execute_and_fetch_all(
cursor, "MATCH (n:label) OPTIONAL MATCH (n:label)-[:TO]->(parent:label) RETURN parent"
)
assert len(results) == 1
assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0)
assert has_n_result_row(cursor, "MATCH (n), (m) CREATE (n)-[:TO]->(m)", 0)
assert has_n_result_row(cursor, "MATCH (n:label) OPTIONAL MATCH (n)-[r:TO]->(m:label) RETURN r", 4)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,44 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import typing
import mgclient
import sys
import pytest
import time
from common import *
def test_order_by_and_limit(connection):
wait_for_shard_manager_to_initialize()
cursor = connection.cursor()
assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:3})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:4})", 0)
results = execute_and_fetch_all(cursor, "MATCH (n) RETURN n ORDER BY n.property DESC")
assert len(results) == 4
i = 4
for n in results:
n_props = n[0].properties
assert len(n_props) == 1
assert n_props["property"] == i
i = i - 1
result = execute_and_fetch_all(cursor, "MATCH (n) RETURN n ORDER BY n.property LIMIT 1")
assert len(result) == 1
assert result[0][0].properties["property"] == 1
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,34 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import typing
import mgclient
import sys
import pytest
import time
from common import *
def test_collect_unwind(connection):
wait_for_shard_manager_to_initialize()
cursor = connection.cursor()
assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:3})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:4})", 0)
assert has_n_result_row(cursor, "MATCH (n) WITH collect(n) AS result RETURN result", 1)
assert has_n_result_row(cursor, "MATCH (n) WITH collect(n) AS nd UNWIND nd AS result RETURN result", 4)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -11,3 +11,23 @@ workloads:
binary: "tests/e2e/pytest_runner.sh"
args: ["distributed_queries/distributed_queries.py"]
<<: *template_cluster
- name: "Distributed unwind collect"
binary: "tests/e2e/pytest_runner.sh"
args: ["distributed_queries/unwind_collect.py"]
<<: *template_cluster
- name: "Distributed order by and limit"
binary: "tests/e2e/pytest_runner.sh"
args: ["distributed_queries/order_by_and_limit.py"]
<<: *template_cluster
- name: "Distributed distinct"
binary: "tests/e2e/pytest_runner.sh"
args: ["distributed_queries/distinct.py"]
<<: *template_cluster
- name: "Distributed optional match"
binary: "tests/e2e/pytest_runner.sh"
args: ["distributed_queries/optional_match.py"]
<<: *template_cluster

View File

@ -12,7 +12,7 @@ PIP_DEPS=(
"neo4j-driver==4.1.1"
"parse==1.18.0"
"parse-type==0.5.2"
"pytest==6.2.3"
"pytest==6.2.5"
"pyyaml==5.4.1"
"six==1.15.0"
)
@ -36,12 +36,12 @@ PYTHON_MINOR=$(python3 -c 'import sys; print(sys.version_info[:][1])')
# NOTE (2021-11-15): PyPi doesn't contain pulsar-client for Python 3.9 so we have to use
# our manually built wheel file. When they update the repository, pulsar-client can be
# added as a regular PIP dependancy
if [ $PYTHON_MINOR -lt 9 ]; then
pip --timeout 1000 install "pulsar-client==2.8.1"
else
pip --timeout 1000 install https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pulsar_client-2.8.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl
fi
#if [ $PYTHON_MINOR -lt 9 ]; then
# pip --timeout 1000 install "pulsar-client==2.8.1"
#else
# pip --timeout 1000 install https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pulsar_client-2.8.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl
#fi
#
for pkg in "${PIP_DEPS[@]}"; do
pip --timeout 1000 install "$pkg"
done