Merge branch 'T1079-MG-add-simple-query-to-benchmark_v2' of github.com:memgraph/memgraph into T1079-MG-add-simple-query-to-benchmark_v2
This commit is contained in:
commit
60485311c8
9
.github/workflows/diff.yaml
vendored
9
.github/workflows/diff.yaml
vendored
@ -219,3 +219,12 @@ jobs:
|
||||
# Run simulation tests.
|
||||
cd build
|
||||
ctest -R memgraph__simulation --output-on-failure -j$THREADS
|
||||
|
||||
- name: Run e2e tests
|
||||
run: |
|
||||
# TODO(gitbuda): Setup mgclient and pymgclient properly.
|
||||
cd tests
|
||||
./setup.sh
|
||||
source ve3/bin/activate
|
||||
cd e2e
|
||||
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../libs/mgclient/lib python runner.py --workloads-root-directory ./distributed_queries
|
||||
|
@ -9,6 +9,7 @@
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <optional>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
@ -365,7 +366,6 @@ std::optional<LabelId> ShardMap::GetLabelId(const std::string &label) const {
|
||||
if (const auto it = labels.find(label); it != labels.end()) {
|
||||
return it->second;
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
@ -382,7 +382,6 @@ std::optional<PropertyId> ShardMap::GetPropertyId(const std::string &property_na
|
||||
if (const auto it = properties.find(property_name); it != properties.end()) {
|
||||
return it->second;
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
@ -399,7 +398,6 @@ std::optional<EdgeTypeId> ShardMap::GetEdgeTypeId(const std::string &edge_type)
|
||||
if (const auto it = edge_types.find(edge_type); it != edge_types.end()) {
|
||||
return it->second;
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
@ -411,6 +409,7 @@ const std::string &ShardMap::GetEdgeTypeName(const EdgeTypeId property) const {
|
||||
}
|
||||
throw utils::BasicException("EdgeTypeId not found!");
|
||||
}
|
||||
|
||||
Shards ShardMap::GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key,
|
||||
const PrimaryKey &end_key) const {
|
||||
MG_ASSERT(start_key <= end_key);
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include "io/address.hpp"
|
||||
#include "storage/v3/config.hpp"
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "storage/v3/name_id_mapper.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "storage/v3/schemas.hpp"
|
||||
#include "storage/v3/temporal.hpp"
|
||||
@ -120,9 +121,9 @@ struct ShardMap {
|
||||
std::map<PropertyName, PropertyId> properties;
|
||||
std::map<EdgeTypeName, EdgeTypeId> edge_types;
|
||||
uint64_t max_label_id{kNotExistingId};
|
||||
std::map<LabelName, LabelId> labels;
|
||||
std::map<LabelId, LabelSpace> label_spaces;
|
||||
std::map<LabelId, std::vector<SchemaProperty>> schemas;
|
||||
std::map<LabelName, LabelId> labels;
|
||||
|
||||
[[nodiscard]] static ShardMap Parse(std::istream &input_stream);
|
||||
friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map);
|
||||
|
@ -13,7 +13,7 @@
|
||||
#ifndef MG_AST_INCLUDE_PATH
|
||||
#ifdef MG_CLANG_TIDY_CHECK
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
|
||||
#define MG_AST_INCLUDE_PATH "query/v2/frontend/ast/ast.hpp"
|
||||
#include "query/v2/bindings/bindings.hpp"
|
||||
#else
|
||||
#error Missing AST include path
|
||||
#endif
|
||||
@ -21,8 +21,6 @@
|
||||
|
||||
#ifndef MG_INJECTED_NAMESPACE_NAME
|
||||
#ifdef MG_CLANG_TIDY_CHECK
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
|
||||
#define MG_INJECTED_NAMESPACE_NAME memgraph::query::v2
|
||||
#else
|
||||
#error Missing AST namespace
|
||||
#endif
|
||||
|
@ -718,7 +718,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
|
||||
TReturnType GetProperty(const TRecordAccessor &record_accessor, PropertyIx prop) {
|
||||
auto maybe_prop = record_accessor.GetProperty(prop.name);
|
||||
// Handler non existent property
|
||||
return conv_(maybe_prop);
|
||||
return conv_(maybe_prop, dba_);
|
||||
}
|
||||
|
||||
template <class TRecordAccessor, class TTag = Tag,
|
||||
@ -726,7 +726,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
|
||||
TReturnType GetProperty(const TRecordAccessor &record_accessor, const std::string_view name) {
|
||||
auto maybe_prop = record_accessor.GetProperty(std::string(name));
|
||||
// Handler non existent property
|
||||
return conv_(maybe_prop);
|
||||
return conv_(maybe_prop, dba_);
|
||||
}
|
||||
|
||||
template <class TRecordAccessor, class TTag = Tag,
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "coordinator/shard_map.hpp"
|
||||
#include "query/v2/accessors.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "query/v2/shard_request_manager.hpp"
|
||||
#include "storage/v3/edge_accessor.hpp"
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "storage/v3/result.hpp"
|
||||
@ -70,51 +71,52 @@ query::v2::TypedValue ToTypedValue(const Value &value) {
|
||||
}
|
||||
}
|
||||
|
||||
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(const query::v2::accessors::VertexAccessor &vertex,
|
||||
const coordinator::ShardMap &shard_map,
|
||||
storage::v3::View /*view*/) {
|
||||
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(
|
||||
const query::v2::accessors::VertexAccessor &vertex, const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||
storage::v3::View /*view*/) {
|
||||
auto id = communication::bolt::Id::FromUint(0);
|
||||
|
||||
auto labels = vertex.Labels();
|
||||
std::vector<std::string> new_labels;
|
||||
new_labels.reserve(labels.size());
|
||||
for (const auto &label : labels) {
|
||||
new_labels.push_back(shard_map.GetLabelName(label.id));
|
||||
new_labels.push_back(shard_request_manager->LabelToName(label.id));
|
||||
}
|
||||
|
||||
auto properties = vertex.Properties();
|
||||
std::map<std::string, Value> new_properties;
|
||||
for (const auto &[prop, property_value] : properties) {
|
||||
new_properties[shard_map.GetPropertyName(prop)] = ToBoltValue(property_value);
|
||||
new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value);
|
||||
}
|
||||
return communication::bolt::Vertex{id, new_labels, new_properties};
|
||||
}
|
||||
|
||||
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(const query::v2::accessors::EdgeAccessor &edge,
|
||||
const coordinator::ShardMap &shard_map,
|
||||
storage::v3::View /*view*/) {
|
||||
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(
|
||||
const query::v2::accessors::EdgeAccessor &edge, const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||
storage::v3::View /*view*/) {
|
||||
// TODO(jbajic) Fix bolt communication
|
||||
auto id = communication::bolt::Id::FromUint(0);
|
||||
auto from = communication::bolt::Id::FromUint(0);
|
||||
auto to = communication::bolt::Id::FromUint(0);
|
||||
const auto &type = shard_map.GetEdgeTypeName(edge.EdgeType());
|
||||
const auto &type = shard_request_manager->EdgeTypeToName(edge.EdgeType());
|
||||
|
||||
auto properties = edge.Properties();
|
||||
std::map<std::string, Value> new_properties;
|
||||
for (const auto &[prop, property_value] : properties) {
|
||||
new_properties[shard_map.GetPropertyName(prop)] = ToBoltValue(property_value);
|
||||
new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value);
|
||||
}
|
||||
return communication::bolt::Edge{id, from, to, type, new_properties};
|
||||
}
|
||||
|
||||
storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::accessors::Path & /*edge*/,
|
||||
const coordinator::ShardMap & /*shard_map*/,
|
||||
storage::v3::View /*view*/) {
|
||||
storage::v3::Result<communication::bolt::Path> ToBoltPath(
|
||||
const query::v2::accessors::Path & /*edge*/, const msgs::ShardRequestManagerInterface * /*shard_request_manager*/,
|
||||
storage::v3::View /*view*/) {
|
||||
// TODO(jbajic) Fix bolt communication
|
||||
return {storage::v3::Error::DELETED_OBJECT};
|
||||
}
|
||||
|
||||
storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const coordinator::ShardMap &shard_map,
|
||||
storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value,
|
||||
const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||
storage::v3::View view) {
|
||||
switch (value.type()) {
|
||||
case query::v2::TypedValue::Type::Null:
|
||||
@ -131,7 +133,7 @@ storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const
|
||||
std::vector<Value> values;
|
||||
values.reserve(value.ValueList().size());
|
||||
for (const auto &v : value.ValueList()) {
|
||||
auto maybe_value = ToBoltValue(v, shard_map, view);
|
||||
auto maybe_value = ToBoltValue(v, shard_request_manager, view);
|
||||
if (maybe_value.HasError()) return maybe_value.GetError();
|
||||
values.emplace_back(std::move(*maybe_value));
|
||||
}
|
||||
@ -140,24 +142,24 @@ storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const
|
||||
case query::v2::TypedValue::Type::Map: {
|
||||
std::map<std::string, Value> map;
|
||||
for (const auto &kv : value.ValueMap()) {
|
||||
auto maybe_value = ToBoltValue(kv.second, shard_map, view);
|
||||
auto maybe_value = ToBoltValue(kv.second, shard_request_manager, view);
|
||||
if (maybe_value.HasError()) return maybe_value.GetError();
|
||||
map.emplace(kv.first, std::move(*maybe_value));
|
||||
}
|
||||
return Value(std::move(map));
|
||||
}
|
||||
case query::v2::TypedValue::Type::Vertex: {
|
||||
auto maybe_vertex = ToBoltVertex(value.ValueVertex(), shard_map, view);
|
||||
auto maybe_vertex = ToBoltVertex(value.ValueVertex(), shard_request_manager, view);
|
||||
if (maybe_vertex.HasError()) return maybe_vertex.GetError();
|
||||
return Value(std::move(*maybe_vertex));
|
||||
}
|
||||
case query::v2::TypedValue::Type::Edge: {
|
||||
auto maybe_edge = ToBoltEdge(value.ValueEdge(), shard_map, view);
|
||||
auto maybe_edge = ToBoltEdge(value.ValueEdge(), shard_request_manager, view);
|
||||
if (maybe_edge.HasError()) return maybe_edge.GetError();
|
||||
return Value(std::move(*maybe_edge));
|
||||
}
|
||||
case query::v2::TypedValue::Type::Path: {
|
||||
auto maybe_path = ToBoltPath(value.ValuePath(), shard_map, view);
|
||||
auto maybe_path = ToBoltPath(value.ValuePath(), shard_request_manager, view);
|
||||
if (maybe_path.HasError()) return maybe_path.GetError();
|
||||
return Value(std::move(*maybe_path));
|
||||
}
|
||||
@ -209,12 +211,6 @@ Value ToBoltValue(msgs::Value value) {
|
||||
}
|
||||
}
|
||||
|
||||
storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::accessors::Path & /*path*/,
|
||||
const storage::v3::Shard & /*db*/,
|
||||
storage::v3::View /*view*/) {
|
||||
return communication::bolt::Path();
|
||||
}
|
||||
|
||||
storage::v3::PropertyValue ToPropertyValue(const Value &value) {
|
||||
switch (value.type()) {
|
||||
case Value::Type::Null:
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include "communication/bolt/v1/value.hpp"
|
||||
#include "coordinator/shard_map.hpp"
|
||||
#include "query/v2/bindings/typed_value.hpp"
|
||||
#include "query/v2/shard_request_manager.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "storage/v3/result.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
@ -30,40 +31,40 @@ namespace memgraph::glue::v2 {
|
||||
|
||||
/// @param storage::v3::VertexAccessor for converting to
|
||||
/// communication::bolt::Vertex.
|
||||
/// @param coordinator::ShardMap shard_map getting label and property names.
|
||||
/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting label and property names.
|
||||
/// @param storage::v3::View for deciding which vertex attributes are visible.
|
||||
///
|
||||
/// @throw std::bad_alloc
|
||||
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(const storage::v3::VertexAccessor &vertex,
|
||||
const coordinator::ShardMap &shard_map,
|
||||
storage::v3::View view);
|
||||
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(
|
||||
const storage::v3::VertexAccessor &vertex, const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||
storage::v3::View view);
|
||||
|
||||
/// @param storage::v3::EdgeAccessor for converting to communication::bolt::Edge.
|
||||
/// @param coordinator::ShardMap shard_map getting edge type and property names.
|
||||
/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting edge type and property names.
|
||||
/// @param storage::v3::View for deciding which edge attributes are visible.
|
||||
///
|
||||
/// @throw std::bad_alloc
|
||||
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(const storage::v3::EdgeAccessor &edge,
|
||||
const coordinator::ShardMap &shard_map,
|
||||
storage::v3::View view);
|
||||
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(
|
||||
const storage::v3::EdgeAccessor &edge, const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||
storage::v3::View view);
|
||||
|
||||
/// @param query::v2::Path for converting to communication::bolt::Path.
|
||||
/// @param coordinator::ShardMap shard_map ToBoltVertex and ToBoltEdge.
|
||||
/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge.
|
||||
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
|
||||
///
|
||||
/// @throw std::bad_alloc
|
||||
storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::accessors::Path &path,
|
||||
const coordinator::ShardMap &shard_map,
|
||||
storage::v3::View view);
|
||||
storage::v3::Result<communication::bolt::Path> ToBoltPath(
|
||||
const query::v2::accessors::Path &path, const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||
storage::v3::View view);
|
||||
|
||||
/// @param query::v2::TypedValue for converting to communication::bolt::Value.
|
||||
/// @param coordinator::ShardMap shard_map ToBoltVertex and ToBoltEdge.
|
||||
/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge.
|
||||
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
|
||||
///
|
||||
/// @throw std::bad_alloc
|
||||
storage::v3::Result<communication::bolt::Value> ToBoltValue(const query::v2::TypedValue &value,
|
||||
const coordinator::ShardMap &shard_map,
|
||||
storage::v3::View view);
|
||||
storage::v3::Result<communication::bolt::Value> ToBoltValue(
|
||||
const query::v2::TypedValue &value, const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||
storage::v3::View view);
|
||||
|
||||
query::v2::TypedValue ToTypedValue(const communication::bolt::Value &value);
|
||||
|
||||
@ -73,7 +74,8 @@ storage::v3::PropertyValue ToPropertyValue(const communication::bolt::Value &val
|
||||
|
||||
communication::bolt::Value ToBoltValue(msgs::Value value);
|
||||
|
||||
communication::bolt::Value ToBoltValue(msgs::Value value, const coordinator::ShardMap &shard_map,
|
||||
communication::bolt::Value ToBoltValue(msgs::Value value,
|
||||
const msgs::ShardRequestManagerInterface *shard_request_manager,
|
||||
storage::v3::View view);
|
||||
|
||||
} // namespace memgraph::glue::v2
|
||||
|
@ -407,9 +407,8 @@ DEFINE_string(organization_name, "", "Organization name.");
|
||||
struct SessionData {
|
||||
// Explicit constructor here to ensure that pointers to all objects are
|
||||
// supplied.
|
||||
SessionData(memgraph::coordinator::ShardMap &shard_map, memgraph::query::v2::InterpreterContext *interpreter_context)
|
||||
: shard_map(&shard_map), interpreter_context(interpreter_context) {}
|
||||
memgraph::coordinator::ShardMap *shard_map;
|
||||
explicit SessionData(memgraph::query::v2::InterpreterContext *interpreter_context)
|
||||
: interpreter_context(interpreter_context) {}
|
||||
memgraph::query::v2::InterpreterContext *interpreter_context;
|
||||
};
|
||||
|
||||
@ -424,7 +423,6 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
|
||||
memgraph::communication::v2::OutputStream *output_stream)
|
||||
: memgraph::communication::bolt::Session<memgraph::communication::v2::InputStream,
|
||||
memgraph::communication::v2::OutputStream>(input_stream, output_stream),
|
||||
shard_map_(data.shard_map),
|
||||
interpreter_(data.interpreter_context),
|
||||
endpoint_(endpoint) {}
|
||||
|
||||
@ -455,7 +453,7 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
|
||||
|
||||
std::map<std::string, memgraph::communication::bolt::Value> Pull(TEncoder *encoder, std::optional<int> n,
|
||||
std::optional<int> qid) override {
|
||||
TypedValueResultStream stream(encoder, *shard_map_);
|
||||
TypedValueResultStream stream(encoder, interpreter_.GetShardRequestManager());
|
||||
return PullResults(stream, n, qid);
|
||||
}
|
||||
|
||||
@ -482,7 +480,8 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
|
||||
const auto &summary = interpreter_.Pull(&stream, n, qid);
|
||||
std::map<std::string, memgraph::communication::bolt::Value> decoded_summary;
|
||||
for (const auto &kv : summary) {
|
||||
auto maybe_value = memgraph::glue::v2::ToBoltValue(kv.second, *shard_map_, memgraph::storage::v3::View::NEW);
|
||||
auto maybe_value = memgraph::glue::v2::ToBoltValue(kv.second, interpreter_.GetShardRequestManager(),
|
||||
memgraph::storage::v3::View::NEW);
|
||||
if (maybe_value.HasError()) {
|
||||
switch (maybe_value.GetError()) {
|
||||
case memgraph::storage::v3::Error::DELETED_OBJECT:
|
||||
@ -507,14 +506,14 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
|
||||
/// before forwarding the calls to original TEncoder.
|
||||
class TypedValueResultStream {
|
||||
public:
|
||||
TypedValueResultStream(TEncoder *encoder, const memgraph::coordinator::ShardMap &shard_map)
|
||||
: encoder_(encoder), shard_map_(&shard_map) {}
|
||||
TypedValueResultStream(TEncoder *encoder, const memgraph::msgs::ShardRequestManagerInterface *shard_request_manager)
|
||||
: encoder_(encoder), shard_request_manager_(shard_request_manager) {}
|
||||
|
||||
void Result(const std::vector<memgraph::query::v2::TypedValue> &values) {
|
||||
std::vector<memgraph::communication::bolt::Value> decoded_values;
|
||||
decoded_values.reserve(values.size());
|
||||
for (const auto &v : values) {
|
||||
auto maybe_value = memgraph::glue::v2::ToBoltValue(v, *shard_map_, memgraph::storage::v3::View::NEW);
|
||||
auto maybe_value = memgraph::glue::v2::ToBoltValue(v, shard_request_manager_, memgraph::storage::v3::View::NEW);
|
||||
if (maybe_value.HasError()) {
|
||||
switch (maybe_value.GetError()) {
|
||||
case memgraph::storage::v3::Error::DELETED_OBJECT:
|
||||
@ -534,12 +533,8 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
|
||||
|
||||
private:
|
||||
TEncoder *encoder_;
|
||||
// NOTE: Needed only for ToBoltValue conversions
|
||||
const memgraph::coordinator::ShardMap *shard_map_;
|
||||
const memgraph::msgs::ShardRequestManagerInterface *shard_request_manager_{nullptr};
|
||||
};
|
||||
|
||||
// NOTE: Needed only for ToBoltValue conversions
|
||||
const memgraph::coordinator::ShardMap *shard_map_;
|
||||
memgraph::query::v2::Interpreter interpreter_;
|
||||
memgraph::communication::v2::ServerEndpoint endpoint_;
|
||||
};
|
||||
@ -680,7 +675,7 @@ int main(int argc, char **argv) {
|
||||
std::move(io),
|
||||
mm.CoordinatorAddress()};
|
||||
|
||||
SessionData session_data{sm, &interpreter_context};
|
||||
SessionData session_data{&interpreter_context};
|
||||
|
||||
interpreter_context.auth = nullptr;
|
||||
interpreter_context.auth_checker = nullptr;
|
||||
|
@ -11,38 +11,59 @@
|
||||
|
||||
#include "query/v2/accessors.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "query/v2/shard_request_manager.hpp"
|
||||
#include "storage/v3/id_types.hpp"
|
||||
|
||||
namespace memgraph::query::v2::accessors {
|
||||
EdgeAccessor::EdgeAccessor(Edge edge) : edge(std::move(edge)) {}
|
||||
EdgeAccessor::EdgeAccessor(Edge edge, const msgs::ShardRequestManagerInterface *manager)
|
||||
: edge(std::move(edge)), manager_(manager) {}
|
||||
|
||||
EdgeTypeId EdgeAccessor::EdgeType() const { return edge.type.id; }
|
||||
|
||||
const std::vector<std::pair<PropertyId, Value>> &EdgeAccessor::Properties() const {
|
||||
return edge.properties;
|
||||
// std::map<std::string, TypedValue> res;
|
||||
// for (const auto &[name, value] : *properties) {
|
||||
// res[name] = ValueToTypedValue(value);
|
||||
// }
|
||||
// return res;
|
||||
}
|
||||
const std::vector<std::pair<PropertyId, Value>> &EdgeAccessor::Properties() const { return edge.properties; }
|
||||
|
||||
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
|
||||
Value EdgeAccessor::GetProperty(const std::string & /*prop_name*/) const {
|
||||
// TODO(kostasrim) fix this
|
||||
return {};
|
||||
Value EdgeAccessor::GetProperty(const std::string &prop_name) const {
|
||||
auto prop_id = manager_->NameToProperty(prop_name);
|
||||
auto it = std::find_if(edge.properties.begin(), edge.properties.end(), [&](auto &pr) { return prop_id == pr.first; });
|
||||
if (it == edge.properties.end()) {
|
||||
return {};
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
const Edge &EdgeAccessor::GetEdge() const { return edge; }
|
||||
|
||||
bool EdgeAccessor::IsCycle() const { return edge.src == edge.dst; };
|
||||
|
||||
VertexAccessor EdgeAccessor::To() const { return VertexAccessor(Vertex{edge.dst}, {}); }
|
||||
VertexAccessor EdgeAccessor::To() const {
|
||||
return VertexAccessor(Vertex{edge.dst}, std::vector<std::pair<PropertyId, msgs::Value>>{}, manager_);
|
||||
}
|
||||
|
||||
VertexAccessor EdgeAccessor::From() const { return VertexAccessor(Vertex{edge.src}, {}); }
|
||||
VertexAccessor EdgeAccessor::From() const {
|
||||
return VertexAccessor(Vertex{edge.src}, std::vector<std::pair<PropertyId, msgs::Value>>{}, manager_);
|
||||
}
|
||||
|
||||
VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props)
|
||||
: vertex(std::move(v)), properties(std::move(props)) {}
|
||||
VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props,
|
||||
const msgs::ShardRequestManagerInterface *manager)
|
||||
: vertex(std::move(v)), properties(std::move(props)), manager_(manager) {}
|
||||
|
||||
VertexAccessor::VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props,
|
||||
const msgs::ShardRequestManagerInterface *manager)
|
||||
: vertex(std::move(v)), manager_(manager) {
|
||||
properties.reserve(props.size());
|
||||
for (auto &[id, value] : props) {
|
||||
properties.emplace_back(std::make_pair(id, std::move(value)));
|
||||
}
|
||||
}
|
||||
|
||||
VertexAccessor::VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props,
|
||||
const msgs::ShardRequestManagerInterface *manager)
|
||||
: vertex(std::move(v)), manager_(manager) {
|
||||
properties.reserve(props.size());
|
||||
for (const auto &[id, value] : props) {
|
||||
properties.emplace_back(std::make_pair(id, value));
|
||||
}
|
||||
}
|
||||
|
||||
Label VertexAccessor::PrimaryLabel() const { return vertex.id.first; }
|
||||
|
||||
@ -58,15 +79,16 @@ bool VertexAccessor::HasLabel(Label &label) const {
|
||||
const std::vector<std::pair<PropertyId, Value>> &VertexAccessor::Properties() const { return properties; }
|
||||
|
||||
Value VertexAccessor::GetProperty(PropertyId prop_id) const {
|
||||
return std::find_if(properties.begin(), properties.end(), [&](auto &pr) { return prop_id == pr.first; })->second;
|
||||
// return ValueToTypedValue(properties[prop_name]);
|
||||
auto it = std::find_if(properties.begin(), properties.end(), [&](auto &pr) { return prop_id == pr.first; });
|
||||
if (it == properties.end()) {
|
||||
return {};
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
|
||||
Value VertexAccessor::GetProperty(const std::string & /*prop_name*/) const {
|
||||
// TODO(kostasrim) Add string mapping
|
||||
return {};
|
||||
// return ValueToTypedValue(properties[prop_name]);
|
||||
Value VertexAccessor::GetProperty(const std::string &prop_name) const {
|
||||
return GetProperty(manager_->NameToProperty(prop_name));
|
||||
}
|
||||
|
||||
msgs::Vertex VertexAccessor::GetVertex() const { return vertex; }
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
#include <optional>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
@ -23,6 +24,10 @@
|
||||
#include "utils/memory.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
|
||||
namespace memgraph::msgs {
|
||||
class ShardRequestManagerInterface;
|
||||
} // namespace memgraph::msgs
|
||||
|
||||
namespace memgraph::query::v2::accessors {
|
||||
|
||||
using Value = memgraph::msgs::Value;
|
||||
@ -36,7 +41,7 @@ class VertexAccessor;
|
||||
|
||||
class EdgeAccessor final {
|
||||
public:
|
||||
explicit EdgeAccessor(Edge edge);
|
||||
explicit EdgeAccessor(Edge edge, const msgs::ShardRequestManagerInterface *manager);
|
||||
|
||||
[[nodiscard]] EdgeTypeId EdgeType() const;
|
||||
|
||||
@ -64,6 +69,7 @@ class EdgeAccessor final {
|
||||
|
||||
private:
|
||||
Edge edge;
|
||||
const msgs::ShardRequestManagerInterface *manager_;
|
||||
};
|
||||
|
||||
class VertexAccessor final {
|
||||
@ -71,7 +77,11 @@ class VertexAccessor final {
|
||||
using PropertyId = msgs::PropertyId;
|
||||
using Label = msgs::Label;
|
||||
using VertexId = msgs::VertexId;
|
||||
VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props);
|
||||
VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props,
|
||||
const msgs::ShardRequestManagerInterface *manager);
|
||||
|
||||
VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props, const msgs::ShardRequestManagerInterface *manager);
|
||||
VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props, const msgs::ShardRequestManagerInterface *manager);
|
||||
|
||||
[[nodiscard]] Label PrimaryLabel() const;
|
||||
|
||||
@ -140,6 +150,7 @@ class VertexAccessor final {
|
||||
private:
|
||||
Vertex vertex;
|
||||
std::vector<std::pair<PropertyId, Value>> properties;
|
||||
const msgs::ShardRequestManagerInterface *manager_;
|
||||
};
|
||||
|
||||
// inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); }
|
||||
|
@ -23,6 +23,10 @@
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "storage/v3/view.hpp"
|
||||
|
||||
namespace memgraph::msgs {
|
||||
class ShardRequestManagerInterface;
|
||||
} // namespace memgraph::msgs
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
|
||||
inline const auto lam = [](const auto &val) { return ValueToTypedValue(val); };
|
||||
@ -32,13 +36,14 @@ class Callable {
|
||||
auto operator()(const memgraph::storage::v3::PropertyValue &val) const {
|
||||
return memgraph::storage::v3::PropertyToTypedValue<TypedValue>(val);
|
||||
};
|
||||
auto operator()(const msgs::Value &val) const { return ValueToTypedValue(val); };
|
||||
auto operator()(const msgs::Value &val, memgraph::msgs::ShardRequestManagerInterface *manager) const {
|
||||
return ValueToTypedValue(val, manager);
|
||||
};
|
||||
};
|
||||
|
||||
} // namespace detail
|
||||
using ExpressionEvaluator =
|
||||
memgraph::expr::ExpressionEvaluator<TypedValue, memgraph::query::v2::EvaluationContext, DbAccessor,
|
||||
storage::v3::View, storage::v3::LabelId, msgs::Value, detail::Callable,
|
||||
memgraph::storage::v3::Error, memgraph::expr::QueryEngineTag>;
|
||||
using ExpressionEvaluator = memgraph::expr::ExpressionEvaluator<
|
||||
TypedValue, memgraph::query::v2::EvaluationContext, memgraph::msgs::ShardRequestManagerInterface, storage::v3::View,
|
||||
storage::v3::LabelId, msgs::Value, detail::Callable, memgraph::storage::v3::Error, memgraph::expr::QueryEngineTag>;
|
||||
|
||||
} // namespace memgraph::query::v2
|
||||
|
@ -13,10 +13,11 @@
|
||||
#include "bindings/typed_value.hpp"
|
||||
#include "query/v2/accessors.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "query/v2/shard_request_manager.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
|
||||
inline TypedValue ValueToTypedValue(const msgs::Value &value) {
|
||||
inline TypedValue ValueToTypedValue(const msgs::Value &value, msgs::ShardRequestManagerInterface *manager) {
|
||||
using Value = msgs::Value;
|
||||
switch (value.type) {
|
||||
case Value::Type::Null:
|
||||
@ -34,7 +35,7 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value) {
|
||||
std::vector<TypedValue> dst;
|
||||
dst.reserve(lst.size());
|
||||
for (const auto &elem : lst) {
|
||||
dst.push_back(ValueToTypedValue(elem));
|
||||
dst.push_back(ValueToTypedValue(elem, manager));
|
||||
}
|
||||
return TypedValue(std::move(dst));
|
||||
}
|
||||
@ -42,14 +43,15 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value) {
|
||||
const auto &value_map = value.map_v;
|
||||
std::map<std::string, TypedValue> dst;
|
||||
for (const auto &[key, val] : value_map) {
|
||||
dst[key] = ValueToTypedValue(val);
|
||||
dst[key] = ValueToTypedValue(val, manager);
|
||||
}
|
||||
return TypedValue(std::move(dst));
|
||||
}
|
||||
case Value::Type::Vertex:
|
||||
return TypedValue(accessors::VertexAccessor(value.vertex_v, {}));
|
||||
return TypedValue(accessors::VertexAccessor(
|
||||
value.vertex_v, std::vector<std::pair<storage::v3::PropertyId, msgs::Value>>{}, manager));
|
||||
case Value::Type::Edge:
|
||||
return TypedValue(accessors::EdgeAccessor(value.edge_v));
|
||||
return TypedValue(accessors::EdgeAccessor(value.edge_v, manager));
|
||||
}
|
||||
throw std::runtime_error("Incorrect type in conversion");
|
||||
}
|
||||
|
@ -22,8 +22,8 @@
|
||||
|
||||
#include "query/v2/bindings/typed_value.hpp"
|
||||
#include "query/v2/conversions.hpp"
|
||||
#include "query/v2/db_accessor.hpp"
|
||||
#include "query/v2/exceptions.hpp"
|
||||
#include "query/v2/shard_request_manager.hpp"
|
||||
#include "storage/v3/conversions.hpp"
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/temporal.hpp"
|
||||
|
@ -16,12 +16,15 @@
|
||||
#include <unordered_map>
|
||||
|
||||
#include "query/v2/bindings/typed_value.hpp"
|
||||
#include "query/v2/db_accessor.hpp"
|
||||
#include "storage/v3/view.hpp"
|
||||
#include "utils/memory.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
namespace memgraph::msgs {
|
||||
class ShardRequestManagerInterface;
|
||||
} // namespace memgraph::msgs
|
||||
|
||||
class DbAccessor;
|
||||
namespace memgraph::query::v2 {
|
||||
|
||||
namespace {
|
||||
const char kStartsWith[] = "STARTSWITH";
|
||||
@ -31,7 +34,9 @@ const char kId[] = "ID";
|
||||
} // namespace
|
||||
|
||||
struct FunctionContext {
|
||||
DbAccessor *db_accessor;
|
||||
// TODO(kostasrim) consider optional here. ShardRequestManager does not exist on the storage.
|
||||
// DbAccessor *db_accessor;
|
||||
msgs::ShardRequestManagerInterface *manager;
|
||||
utils::MemoryResource *memory;
|
||||
int64_t timestamp;
|
||||
std::unordered_map<std::string, int64_t> *counters;
|
||||
|
@ -144,7 +144,7 @@ class ReplQueryHandler final : public query::v2::ReplicationQueryHandler {
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
|
||||
Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters ¶meters,
|
||||
DbAccessor *db_accessor) {
|
||||
msgs::ShardRequestManagerInterface *manager) {
|
||||
// Empty frame for evaluation of password expression. This is OK since
|
||||
// password should be either null or string literal and it's evaluation
|
||||
// should not depend on frame.
|
||||
@ -155,7 +155,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa
|
||||
// the argument to Callback.
|
||||
evaluation_context.timestamp = QueryTimestamp();
|
||||
evaluation_context.parameters = parameters;
|
||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD);
|
||||
|
||||
std::string username = auth_query->user_;
|
||||
std::string rolename = auth_query->role_;
|
||||
@ -313,7 +313,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa
|
||||
}
|
||||
|
||||
Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters ¶meters,
|
||||
InterpreterContext *interpreter_context, DbAccessor *db_accessor,
|
||||
InterpreterContext *interpreter_context, msgs::ShardRequestManagerInterface *manager,
|
||||
std::vector<Notification> *notifications) {
|
||||
expr::Frame<TypedValue> frame(0);
|
||||
SymbolTable symbol_table;
|
||||
@ -322,7 +322,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
|
||||
// the argument to Callback.
|
||||
evaluation_context.timestamp = QueryTimestamp();
|
||||
evaluation_context.parameters = parameters;
|
||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD);
|
||||
|
||||
Callback callback;
|
||||
switch (repl_query->action_) {
|
||||
@ -448,7 +448,8 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
|
||||
}
|
||||
}
|
||||
|
||||
Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶meters, DbAccessor *db_accessor) {
|
||||
Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶meters,
|
||||
msgs::ShardRequestManagerInterface *manager) {
|
||||
expr::Frame<TypedValue> frame(0);
|
||||
SymbolTable symbol_table;
|
||||
EvaluationContext evaluation_context;
|
||||
@ -458,7 +459,7 @@ Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶m
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
|
||||
.count();
|
||||
evaluation_context.parameters = parameters;
|
||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD);
|
||||
|
||||
Callback callback;
|
||||
switch (setting_query->action_) {
|
||||
@ -886,7 +887,8 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
|
||||
EvaluationContext evaluation_context;
|
||||
evaluation_context.timestamp = QueryTimestamp();
|
||||
evaluation_context.parameters = parsed_query.parameters;
|
||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::v3::View::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, shard_request_manager,
|
||||
storage::v3::View::OLD);
|
||||
const auto memory_limit =
|
||||
expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
|
||||
if (memory_limit) {
|
||||
@ -901,7 +903,6 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
|
||||
"convert the parsed row values to the appropriate type. This can be done using the built-in "
|
||||
"conversion functions such as ToInteger, ToFloat, ToBoolean etc.");
|
||||
}
|
||||
shard_request_manager->StartTransaction();
|
||||
auto plan = CypherQueryToPlan(
|
||||
parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query, parsed_query.parameters,
|
||||
parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, shard_request_manager);
|
||||
@ -957,10 +958,10 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map<std::string
|
||||
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query);
|
||||
MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in EXPLAIN");
|
||||
|
||||
auto cypher_query_plan =
|
||||
CypherQueryToPlan(parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage),
|
||||
cypher_query, parsed_inner_query.parameters,
|
||||
parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, nullptr);
|
||||
auto cypher_query_plan = CypherQueryToPlan(
|
||||
parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), cypher_query,
|
||||
parsed_inner_query.parameters, parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr,
|
||||
shard_request_manager);
|
||||
|
||||
std::stringstream printed_plan;
|
||||
plan::PrettyPrint(*shard_request_manager, &cypher_query_plan->plan(), &printed_plan);
|
||||
@ -1030,7 +1031,8 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
|
||||
EvaluationContext evaluation_context;
|
||||
evaluation_context.timestamp = QueryTimestamp();
|
||||
evaluation_context.parameters = parsed_inner_query.parameters;
|
||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::v3::View::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, shard_request_manager,
|
||||
storage::v3::View::OLD);
|
||||
const auto memory_limit =
|
||||
expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
|
||||
|
||||
@ -1179,14 +1181,15 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
|
||||
|
||||
PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
|
||||
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
|
||||
DbAccessor *dba, utils::MemoryResource *execution_memory) {
|
||||
DbAccessor *dba, utils::MemoryResource *execution_memory,
|
||||
msgs::ShardRequestManagerInterface *manager) {
|
||||
if (in_explicit_transaction) {
|
||||
throw UserModificationInMulticommandTxException();
|
||||
}
|
||||
|
||||
auto *auth_query = utils::Downcast<AuthQuery>(parsed_query.query);
|
||||
|
||||
auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, dba);
|
||||
auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, manager);
|
||||
|
||||
SymbolTable symbol_table;
|
||||
std::vector<Symbol> output_symbols;
|
||||
@ -1215,14 +1218,14 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa
|
||||
|
||||
PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
|
||||
std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
|
||||
DbAccessor *dba) {
|
||||
msgs::ShardRequestManagerInterface *manager) {
|
||||
if (in_explicit_transaction) {
|
||||
throw ReplicationModificationInMulticommandTxException();
|
||||
}
|
||||
|
||||
auto *replication_query = utils::Downcast<ReplicationQuery>(parsed_query.query);
|
||||
auto callback =
|
||||
HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, dba, notifications);
|
||||
HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, manager, notifications);
|
||||
|
||||
return PreparedQuery{callback.header, std::move(parsed_query.required_privileges),
|
||||
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
|
||||
@ -1310,14 +1313,15 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli
|
||||
throw SemanticException("CreateSnapshot query is not supported!");
|
||||
}
|
||||
|
||||
PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, DbAccessor *dba) {
|
||||
PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
|
||||
msgs::ShardRequestManagerInterface *manager) {
|
||||
if (in_explicit_transaction) {
|
||||
throw SettingConfigInMulticommandTxException{};
|
||||
}
|
||||
|
||||
auto *setting_query = utils::Downcast<SettingQuery>(parsed_query.query);
|
||||
MG_ASSERT(setting_query);
|
||||
auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, dba);
|
||||
auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, manager);
|
||||
|
||||
return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges),
|
||||
[callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
|
||||
@ -1511,6 +1515,11 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
ParsedQuery parsed_query =
|
||||
ParseQuery(query_string, params, &interpreter_context_->ast_cache, interpreter_context_->config.query);
|
||||
query_execution->summary["parsing_time"] = parsing_timer.Elapsed().count();
|
||||
if (!in_explicit_transaction_ &&
|
||||
(utils::Downcast<CypherQuery>(parsed_query.query) || utils::Downcast<ExplainQuery>(parsed_query.query) ||
|
||||
utils::Downcast<ProfileQuery>(parsed_query.query))) {
|
||||
shard_request_manager_->StartTransaction();
|
||||
}
|
||||
|
||||
utils::Timer planning_timer;
|
||||
PreparedQuery prepared_query;
|
||||
@ -1533,9 +1542,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_,
|
||||
&query_execution->notifications, interpreter_context_);
|
||||
} else if (utils::Downcast<AuthQuery>(parsed_query.query)) {
|
||||
prepared_query = PrepareAuthQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
|
||||
interpreter_context_, &*execution_db_accessor_,
|
||||
&query_execution->execution_memory_with_exception);
|
||||
prepared_query = PrepareAuthQuery(
|
||||
std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_,
|
||||
&*execution_db_accessor_, &query_execution->execution_memory_with_exception, shard_request_manager_.get());
|
||||
} else if (utils::Downcast<InfoQuery>(parsed_query.query)) {
|
||||
prepared_query = PrepareInfoQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
|
||||
interpreter_context_, interpreter_context_->db,
|
||||
@ -1546,7 +1555,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
} else if (utils::Downcast<ReplicationQuery>(parsed_query.query)) {
|
||||
prepared_query =
|
||||
PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications,
|
||||
interpreter_context_, &*execution_db_accessor_);
|
||||
interpreter_context_, shard_request_manager_.get());
|
||||
} else if (utils::Downcast<LockPathQuery>(parsed_query.query)) {
|
||||
prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_,
|
||||
&*execution_db_accessor_);
|
||||
@ -1563,7 +1572,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
prepared_query =
|
||||
PrepareCreateSnapshotQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_);
|
||||
} else if (utils::Downcast<SettingQuery>(parsed_query.query)) {
|
||||
prepared_query = PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, &*execution_db_accessor_);
|
||||
prepared_query =
|
||||
PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, shard_request_manager_.get());
|
||||
} else if (utils::Downcast<VersionQuery>(parsed_query.query)) {
|
||||
prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_);
|
||||
} else if (utils::Downcast<SchemaQuery>(parsed_query.query)) {
|
||||
|
@ -296,6 +296,8 @@ class Interpreter final {
|
||||
*/
|
||||
void Abort();
|
||||
|
||||
const msgs::ShardRequestManagerInterface *GetShardRequestManager() const { return shard_request_manager_.get(); }
|
||||
|
||||
private:
|
||||
struct QueryExecution {
|
||||
std::optional<PreparedQuery> prepared_query;
|
||||
|
@ -169,6 +169,7 @@ class DistributedCreateNodeCursor : public Cursor {
|
||||
if (input_cursor_->Pull(frame, context)) {
|
||||
auto &shard_manager = context.shard_request_manager;
|
||||
shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame));
|
||||
PlaceNodeOnTheFrame(frame, context);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -179,8 +180,19 @@ class DistributedCreateNodeCursor : public Cursor {
|
||||
|
||||
void Reset() override { state_ = {}; }
|
||||
|
||||
std::vector<msgs::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) const {
|
||||
void PlaceNodeOnTheFrame(Frame &frame, ExecutionContext &context) {
|
||||
// TODO(kostasrim) Make this work with batching
|
||||
const auto primary_label = msgs::Label{.id = nodes_info_[0]->labels[0]};
|
||||
msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[0])};
|
||||
frame[nodes_info_.front()->symbol] = TypedValue(
|
||||
query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[0], context.shard_request_manager));
|
||||
}
|
||||
|
||||
std::vector<msgs::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) {
|
||||
std::vector<msgs::NewVertex> requests;
|
||||
// TODO(kostasrim) this assertion should be removed once we support multiple vertex creation
|
||||
MG_ASSERT(nodes_info_.size() == 1);
|
||||
msgs::PrimaryKey pk;
|
||||
for (const auto &node_info : nodes_info_) {
|
||||
msgs::NewVertex rqst;
|
||||
MG_ASSERT(!node_info->labels.empty(), "Cannot determine primary label");
|
||||
@ -188,17 +200,14 @@ class DistributedCreateNodeCursor : public Cursor {
|
||||
// TODO(jbajic) Fix properties not send,
|
||||
// suggestion: ignore distinction between properties and primary keys
|
||||
// since schema validation is done on storage side
|
||||
std::map<msgs::PropertyId, msgs::Value> properties;
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr,
|
||||
storage::v3::View::NEW);
|
||||
if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info->properties)) {
|
||||
for (const auto &[key, value_expression] : *node_info_properties) {
|
||||
TypedValue val = value_expression->Accept(evaluator);
|
||||
|
||||
if (context.shard_request_manager->IsPrimaryKey(primary_label, key)) {
|
||||
rqst.primary_key.push_back(TypedValueToValue(val));
|
||||
} else {
|
||||
properties[key] = TypedValueToValue(val);
|
||||
pk.push_back(TypedValueToValue(val));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -207,9 +216,8 @@ class DistributedCreateNodeCursor : public Cursor {
|
||||
auto key_str = std::string(key);
|
||||
auto property_id = context.shard_request_manager->NameToProperty(key_str);
|
||||
if (context.shard_request_manager->IsPrimaryKey(primary_label, property_id)) {
|
||||
rqst.primary_key.push_back(storage::v3::TypedValueToValue(value));
|
||||
} else {
|
||||
properties[property_id] = TypedValueToValue(value);
|
||||
rqst.primary_key.push_back(TypedValueToValue(value));
|
||||
pk.push_back(TypedValueToValue(value));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -219,14 +227,18 @@ class DistributedCreateNodeCursor : public Cursor {
|
||||
}
|
||||
// TODO(kostasrim) Copy non primary labels as well
|
||||
rqst.label_ids.push_back(msgs::Label{.id = primary_label});
|
||||
src_vertex_props_.push_back(rqst.properties);
|
||||
requests.push_back(std::move(rqst));
|
||||
}
|
||||
primary_keys_.push_back(std::move(pk));
|
||||
return requests;
|
||||
}
|
||||
|
||||
private:
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
std::vector<const NodeCreationInfo *> nodes_info_;
|
||||
std::vector<std::vector<std::pair<storage::v3::PropertyId, msgs::Value>>> src_vertex_props_;
|
||||
std::vector<msgs::PrimaryKey> primary_keys_;
|
||||
msgs::ExecutionState<msgs::CreateVerticesRequest> state_;
|
||||
};
|
||||
|
||||
@ -687,7 +699,7 @@ bool Filter::FilterCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
|
||||
// Like all filters, newly set values should not affect filtering of old
|
||||
// nodes and edges.
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.shard_request_manager,
|
||||
storage::v3::View::OLD);
|
||||
while (input_cursor_->Pull(frame, context)) {
|
||||
if (EvaluateFilter(evaluator, self_.expression_)) return true;
|
||||
@ -728,8 +740,8 @@ bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
|
||||
if (input_cursor_->Pull(frame, context)) {
|
||||
// Produce should always yield the latest results.
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
||||
storage::v3::View::NEW);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
|
||||
context.shard_request_manager, storage::v3::View::NEW);
|
||||
for (auto named_expr : self_.named_expressions_) named_expr->Accept(evaluator);
|
||||
|
||||
return true;
|
||||
@ -1149,8 +1161,8 @@ class AggregateCursor : public Cursor {
|
||||
* aggregation results, and not on the number of inputs.
|
||||
*/
|
||||
void ProcessAll(Frame *frame, ExecutionContext *context) {
|
||||
ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context, context->db_accessor,
|
||||
storage::v3::View::NEW);
|
||||
ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context,
|
||||
context->shard_request_manager, storage::v3::View::NEW);
|
||||
while (input_cursor_->Pull(*frame, *context)) {
|
||||
ProcessOne(*frame, &evaluator);
|
||||
}
|
||||
@ -1370,8 +1382,8 @@ bool Skip::SkipCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
// First successful pull from the input, evaluate the skip expression.
|
||||
// The skip expression doesn't contain identifiers so graph view
|
||||
// parameter is not important.
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
||||
storage::v3::View::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
|
||||
context.shard_request_manager, storage::v3::View::OLD);
|
||||
TypedValue to_skip = self_.expression_->Accept(evaluator);
|
||||
if (to_skip.type() != TypedValue::Type::Int)
|
||||
throw QueryRuntimeException("Number of elements to skip must be an integer.");
|
||||
@ -1425,8 +1437,8 @@ bool Limit::LimitCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
if (limit_ == -1) {
|
||||
// Limit expression doesn't contain identifiers so graph view is not
|
||||
// important.
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
||||
storage::v3::View::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
|
||||
context.shard_request_manager, storage::v3::View::OLD);
|
||||
TypedValue limit = self_.expression_->Accept(evaluator);
|
||||
if (limit.type() != TypedValue::Type::Int)
|
||||
throw QueryRuntimeException("Limit on number of returned elements must be an integer.");
|
||||
@ -1481,8 +1493,8 @@ class OrderByCursor : public Cursor {
|
||||
SCOPED_PROFILE_OP("OrderBy");
|
||||
|
||||
if (!did_pull_all_) {
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
||||
storage::v3::View::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
|
||||
context.shard_request_manager, storage::v3::View::OLD);
|
||||
auto *mem = cache_.get_allocator().GetMemoryResource();
|
||||
while (input_cursor_->Pull(frame, context)) {
|
||||
// collect the order_by elements
|
||||
@ -1739,8 +1751,8 @@ class UnwindCursor : public Cursor {
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
|
||||
// successful pull from input, initialize value and iterator
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
||||
storage::v3::View::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
|
||||
context.shard_request_manager, storage::v3::View::OLD);
|
||||
TypedValue input_value = self_.input_expression_->Accept(evaluator);
|
||||
if (input_value.type() != TypedValue::Type::List)
|
||||
throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type());
|
||||
@ -2217,7 +2229,7 @@ class LoadCsvCursor : public Cursor {
|
||||
// self_->delimiter_, and self_->quote_ earlier (say, in the interpreter.cpp)
|
||||
// without massacring the code even worse than I did here
|
||||
if (UNLIKELY(!reader_)) {
|
||||
reader_ = MakeReader(&context.evaluation_context);
|
||||
reader_ = MakeReader(context);
|
||||
}
|
||||
|
||||
bool input_pulled = input_cursor_->Pull(frame, context);
|
||||
@ -2246,11 +2258,12 @@ class LoadCsvCursor : public Cursor {
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
private:
|
||||
csv::Reader MakeReader(EvaluationContext *eval_context) {
|
||||
csv::Reader MakeReader(ExecutionContext &context) {
|
||||
auto &eval_context = context.evaluation_context;
|
||||
Frame frame(0);
|
||||
SymbolTable symbol_table;
|
||||
DbAccessor *dba = nullptr;
|
||||
auto evaluator = ExpressionEvaluator(&frame, symbol_table, *eval_context, dba, storage::v3::View::OLD);
|
||||
auto evaluator =
|
||||
ExpressionEvaluator(&frame, symbol_table, eval_context, context.shard_request_manager, storage::v3::View::OLD);
|
||||
|
||||
auto maybe_file = ToOptionalString(&evaluator, self_->file_);
|
||||
auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_);
|
||||
@ -2287,8 +2300,8 @@ class ForeachCursor : public Cursor {
|
||||
return false;
|
||||
}
|
||||
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
||||
storage::v3::View::NEW);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
|
||||
context.shard_request_manager, storage::v3::View::NEW);
|
||||
TypedValue expr_result = expression->Accept(evaluator);
|
||||
|
||||
if (expr_result.IsNull()) {
|
||||
@ -2458,15 +2471,51 @@ class DistributedExpandCursor : public Cursor {
|
||||
: self_(self),
|
||||
input_cursor_(self.input_->MakeCursor(mem)),
|
||||
current_in_edge_it_(current_in_edges_.begin()),
|
||||
current_out_edge_it_(current_out_edges_.begin()) {
|
||||
if (self_.common_.existing_node) {
|
||||
throw QueryRuntimeException("Cannot use existing node with DistributedExpandOne cursor!");
|
||||
}
|
||||
}
|
||||
current_out_edge_it_(current_out_edges_.begin()) {}
|
||||
|
||||
using VertexAccessor = accessors::VertexAccessor;
|
||||
using EdgeAccessor = accessors::EdgeAccessor;
|
||||
|
||||
static constexpr auto DirectionToMsgsDirection(const auto direction) {
|
||||
switch (direction) {
|
||||
case EdgeAtom::Direction::IN:
|
||||
return msgs::EdgeDirection::IN;
|
||||
case EdgeAtom::Direction::OUT:
|
||||
return msgs::EdgeDirection::OUT;
|
||||
case EdgeAtom::Direction::BOTH:
|
||||
return msgs::EdgeDirection::BOTH;
|
||||
}
|
||||
};
|
||||
|
||||
void PullDstVertex(Frame &frame, ExecutionContext &context, const EdgeAtom::Direction direction) {
|
||||
if (self_.common_.existing_node) {
|
||||
return;
|
||||
}
|
||||
MG_ASSERT(direction != EdgeAtom::Direction::BOTH);
|
||||
const auto &edge = frame[self_.common_.edge_symbol].ValueEdge();
|
||||
static auto get_dst_vertex = [&edge](const EdgeAtom::Direction direction) {
|
||||
switch (direction) {
|
||||
case EdgeAtom::Direction::IN:
|
||||
return edge.From().Id();
|
||||
case EdgeAtom::Direction::OUT:
|
||||
return edge.To().Id();
|
||||
case EdgeAtom::Direction::BOTH:
|
||||
throw std::runtime_error("EdgeDirection Both not implemented");
|
||||
}
|
||||
};
|
||||
msgs::ExpandOneRequest request;
|
||||
// to not fetch any properties of the edges
|
||||
request.edge_properties.emplace();
|
||||
request.src_vertices.push_back(get_dst_vertex(direction));
|
||||
request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN;
|
||||
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
|
||||
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
|
||||
MG_ASSERT(result_rows.size() == 1);
|
||||
auto &result_row = result_rows.front();
|
||||
frame[self_.common_.node_symbol] = accessors::VertexAccessor(
|
||||
msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.shard_request_manager);
|
||||
}
|
||||
|
||||
bool InitEdges(Frame &frame, ExecutionContext &context) {
|
||||
// Input Vertex could be null if it is created by a failed optional match. In
|
||||
// those cases we skip that input pull and continue with the next.
|
||||
@ -2480,44 +2529,41 @@ class DistributedExpandCursor : public Cursor {
|
||||
|
||||
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
|
||||
auto &vertex = vertex_value.ValueVertex();
|
||||
static constexpr auto direction_to_msgs_direction = [](const EdgeAtom::Direction direction) {
|
||||
switch (direction) {
|
||||
case EdgeAtom::Direction::IN:
|
||||
return msgs::EdgeDirection::IN;
|
||||
case EdgeAtom::Direction::OUT:
|
||||
return msgs::EdgeDirection::OUT;
|
||||
case EdgeAtom::Direction::BOTH:
|
||||
return msgs::EdgeDirection::BOTH;
|
||||
}
|
||||
};
|
||||
|
||||
msgs::ExpandOneRequest request;
|
||||
request.direction = direction_to_msgs_direction(self_.common_.direction);
|
||||
request.direction = DirectionToMsgsDirection(self_.common_.direction);
|
||||
// to not fetch any properties of the edges
|
||||
request.edge_properties.emplace();
|
||||
request.src_vertices.push_back(vertex.Id());
|
||||
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
|
||||
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
|
||||
MG_ASSERT(result_rows.size() == 1);
|
||||
auto &result_row = result_rows.front();
|
||||
|
||||
const auto convert_edges = [&vertex](
|
||||
if (self_.common_.existing_node) {
|
||||
const auto &node = frame[self_.common_.node_symbol].ValueVertex().Id();
|
||||
auto &in = result_row.in_edges_with_specific_properties;
|
||||
std::erase_if(in, [&node](auto &edge) { return edge.other_end != node; });
|
||||
auto &out = result_row.out_edges_with_specific_properties;
|
||||
std::erase_if(out, [&node](auto &edge) { return edge.other_end != node; });
|
||||
}
|
||||
|
||||
const auto convert_edges = [&vertex, &context](
|
||||
std::vector<msgs::ExpandOneResultRow::EdgeWithSpecificProperties> &&edge_messages,
|
||||
const EdgeAtom::Direction direction) {
|
||||
std::vector<EdgeAccessor> edge_accessors;
|
||||
edge_accessors.reserve(edge_messages.size());
|
||||
|
||||
switch (direction) {
|
||||
case EdgeAtom::Direction::IN: {
|
||||
for (auto &edge : edge_messages) {
|
||||
edge_accessors.emplace_back(
|
||||
msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type});
|
||||
edge_accessors.emplace_back(msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type},
|
||||
context.shard_request_manager);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case EdgeAtom::Direction::OUT: {
|
||||
for (auto &edge : edge_messages) {
|
||||
edge_accessors.emplace_back(
|
||||
msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type});
|
||||
edge_accessors.emplace_back(msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type},
|
||||
context.shard_request_manager);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -2527,12 +2573,13 @@ class DistributedExpandCursor : public Cursor {
|
||||
}
|
||||
return edge_accessors;
|
||||
};
|
||||
|
||||
current_in_edges_ =
|
||||
convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::IN);
|
||||
current_in_edge_it_ = current_in_edges_.begin();
|
||||
current_in_edges_ =
|
||||
convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::OUT);
|
||||
current_in_edge_it_ = current_in_edges_.begin();
|
||||
current_out_edges_ =
|
||||
convert_edges(std::move(result_row.out_edges_with_specific_properties), EdgeAtom::Direction::OUT);
|
||||
current_out_edge_it_ = current_out_edges_.begin();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -2540,19 +2587,6 @@ class DistributedExpandCursor : public Cursor {
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP("DistributedExpand");
|
||||
// A helper function for expanding a node from an edge.
|
||||
auto pull_node = [this, &frame](const EdgeAccessor &new_edge, EdgeAtom::Direction direction) {
|
||||
if (self_.common_.existing_node) return;
|
||||
switch (direction) {
|
||||
case EdgeAtom::Direction::IN:
|
||||
frame[self_.common_.node_symbol] = new_edge.From();
|
||||
break;
|
||||
case EdgeAtom::Direction::OUT:
|
||||
frame[self_.common_.node_symbol] = new_edge.To();
|
||||
break;
|
||||
case EdgeAtom::Direction::BOTH:
|
||||
LOG_FATAL("Must indicate exact expansion direction here");
|
||||
}
|
||||
};
|
||||
|
||||
while (true) {
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
@ -2561,7 +2595,7 @@ class DistributedExpandCursor : public Cursor {
|
||||
auto &edge = *current_in_edge_it_;
|
||||
++current_in_edge_it_;
|
||||
frame[self_.common_.edge_symbol] = edge;
|
||||
pull_node(edge, EdgeAtom::Direction::IN);
|
||||
PullDstVertex(frame, context, EdgeAtom::Direction::IN);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -2573,7 +2607,7 @@ class DistributedExpandCursor : public Cursor {
|
||||
continue;
|
||||
};
|
||||
frame[self_.common_.edge_symbol] = edge;
|
||||
pull_node(edge, EdgeAtom::Direction::OUT);
|
||||
PullDstVertex(frame, context, EdgeAtom::Direction::OUT);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -387,7 +387,6 @@ struct GetPropertiesResponse {
|
||||
enum class EdgeDirection : uint8_t { OUT = 1, IN = 2, BOTH = 3 };
|
||||
|
||||
struct ExpandOneRequest {
|
||||
// TODO(antaljanosbenjamin): Filtering based on the id of the other end of the edge?
|
||||
Hlc transaction_id;
|
||||
std::vector<VertexId> src_vertices;
|
||||
// return types that type is in this list
|
||||
|
@ -171,6 +171,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
|
||||
if (hlc_response.fresher_shard_map) {
|
||||
shards_map_ = hlc_response.fresher_shard_map.value();
|
||||
SetUpNameIdMappers();
|
||||
}
|
||||
}
|
||||
|
||||
@ -186,6 +187,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
|
||||
if (hlc_response.fresher_shard_map) {
|
||||
shards_map_ = hlc_response.fresher_shard_map.value();
|
||||
SetUpNameIdMappers();
|
||||
}
|
||||
auto commit_timestamp = hlc_response.new_hlc;
|
||||
|
||||
@ -223,14 +225,14 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
return shards_map_.GetLabelId(name).value();
|
||||
}
|
||||
|
||||
const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const override {
|
||||
return shards_map_.GetPropertyName(prop);
|
||||
const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override {
|
||||
return properties_.IdToName(id.AsUint());
|
||||
}
|
||||
const std::string &LabelToName(memgraph::storage::v3::LabelId label) const override {
|
||||
return shards_map_.GetLabelName(label);
|
||||
const std::string &LabelToName(memgraph::storage::v3::LabelId id) const override {
|
||||
return labels_.IdToName(id.AsUint());
|
||||
}
|
||||
const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const override {
|
||||
return shards_map_.GetEdgeTypeName(type);
|
||||
const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId id) const override {
|
||||
return edge_types_.IdToName(id.AsUint());
|
||||
}
|
||||
|
||||
bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override {
|
||||
@ -358,7 +360,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
std::vector<VertexAccessor> accessors;
|
||||
for (auto &response : responses) {
|
||||
for (auto &result_row : response.results) {
|
||||
accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props)));
|
||||
accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props), this));
|
||||
}
|
||||
}
|
||||
return accessors;
|
||||
@ -697,7 +699,28 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
}
|
||||
}
|
||||
|
||||
void SetUpNameIdMappers() {
|
||||
std::unordered_map<uint64_t, std::string> id_to_name;
|
||||
for (const auto &[name, id] : shards_map_.labels) {
|
||||
id_to_name.emplace(id.AsUint(), name);
|
||||
}
|
||||
labels_.StoreMapping(std::move(id_to_name));
|
||||
id_to_name.clear();
|
||||
for (const auto &[name, id] : shards_map_.properties) {
|
||||
id_to_name.emplace(id.AsUint(), name);
|
||||
}
|
||||
properties_.StoreMapping(std::move(id_to_name));
|
||||
id_to_name.clear();
|
||||
for (const auto &[name, id] : shards_map_.edge_types) {
|
||||
id_to_name.emplace(id.AsUint(), name);
|
||||
}
|
||||
edge_types_.StoreMapping(std::move(id_to_name));
|
||||
}
|
||||
|
||||
ShardMap shards_map_;
|
||||
storage::v3::NameIdMapper properties_;
|
||||
storage::v3::NameIdMapper edge_types_;
|
||||
storage::v3::NameIdMapper labels_;
|
||||
CoordinatorClient coord_cli_;
|
||||
RsmStorageClientManager<StorageClient> storage_cli_manager_;
|
||||
memgraph::io::Io<TTransport> io_;
|
||||
|
@ -53,6 +53,10 @@ class NameIdMapper final {
|
||||
return it->second;
|
||||
}
|
||||
|
||||
const auto &GetIdToNameMap() const { return id_to_name_; }
|
||||
|
||||
const auto &GetNameToIdMap() const { return name_to_id_; }
|
||||
|
||||
private:
|
||||
// Necessary for comparison with string_view nad string
|
||||
// https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0919r1.html
|
||||
|
@ -17,6 +17,7 @@
|
||||
|
||||
#include "parser/opencypher/parser.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "storage/v2/view.hpp"
|
||||
#include "storage/v3/bindings/ast/ast.hpp"
|
||||
#include "storage/v3/bindings/cypher_main_visitor.hpp"
|
||||
#include "storage/v3/bindings/db_accessor.hpp"
|
||||
@ -113,6 +114,24 @@ std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> PrimaryKeysFromAccessor(const VertexAccessor &acc, View view,
|
||||
const Schemas::Schema *schema) {
|
||||
std::map<PropertyId, Value> ret;
|
||||
auto props = acc.Properties(view);
|
||||
auto maybe_pk = acc.PrimaryKey(view);
|
||||
if (maybe_pk.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get vertex primary key.");
|
||||
return std::nullopt;
|
||||
}
|
||||
auto &pk = maybe_pk.GetValue();
|
||||
MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!");
|
||||
for (size_t i{0}; i < schema->second.size(); ++i) {
|
||||
ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i])));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view,
|
||||
const Schemas::Schema *schema) {
|
||||
std::map<PropertyId, Value> ret;
|
||||
@ -129,17 +148,9 @@ std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(cons
|
||||
});
|
||||
properties.clear();
|
||||
|
||||
// TODO(antaljanosbenjamin): Once the VertexAccessor::Properties returns also the primary keys, we can get rid of this
|
||||
// code.
|
||||
auto maybe_pk = acc.PrimaryKey(view);
|
||||
if (maybe_pk.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get vertex primary key.");
|
||||
}
|
||||
|
||||
auto &pk = maybe_pk.GetValue();
|
||||
MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!");
|
||||
for (size_t i{0}; i < schema->second.size(); ++i) {
|
||||
ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i])));
|
||||
auto pks = PrimaryKeysFromAccessor(acc, view, schema);
|
||||
if (pks) {
|
||||
ret.merge(*pks);
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -190,7 +201,9 @@ std::optional<msgs::Vertex> FillUpSourceVertex(const std::optional<VertexAccesso
|
||||
}
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc,
|
||||
const msgs::ExpandOneRequest &req) {
|
||||
const msgs::ExpandOneRequest &req,
|
||||
storage::v3::View view,
|
||||
const Schemas::Schema *schema) {
|
||||
std::map<PropertyId, Value> src_vertex_properties;
|
||||
|
||||
if (!req.src_vertex_properties) {
|
||||
@ -204,6 +217,10 @@ std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const st
|
||||
for (auto &[key, val] : props.GetValue()) {
|
||||
src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val))));
|
||||
}
|
||||
auto pks = PrimaryKeysFromAccessor(*v_acc, view, schema);
|
||||
if (pks) {
|
||||
src_vertex_properties.merge(*pks);
|
||||
}
|
||||
|
||||
} else if (req.src_vertex_properties.value().empty()) {
|
||||
// NOOP
|
||||
@ -264,7 +281,6 @@ std::optional<std::array<std::vector<EdgeAccessor>, 2>> FillUpConnectingEdges(
|
||||
return std::nullopt;
|
||||
}
|
||||
in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN);
|
||||
|
||||
auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types);
|
||||
if (out_edges_result.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
|
||||
@ -303,7 +319,8 @@ bool FillEdges(const std::vector<EdgeAccessor> &edges, msgs::ExpandOneResultRow
|
||||
|
||||
std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req,
|
||||
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler) {
|
||||
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler,
|
||||
const Schemas::Schema *schema) {
|
||||
/// Fill up source vertex
|
||||
const auto primary_key = ConvertPropertyVector(std::move(src_vertex.second));
|
||||
auto v_acc = acc.FindVertex(primary_key, View::NEW);
|
||||
@ -312,9 +329,9 @@ std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
if (!source_vertex) {
|
||||
return std::nullopt;
|
||||
}
|
||||
std::optional<std::map<PropertyId, Value>> src_vertex_properties;
|
||||
src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema);
|
||||
|
||||
/// Fill up source vertex properties
|
||||
auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req);
|
||||
if (!src_vertex_properties) {
|
||||
return std::nullopt;
|
||||
}
|
||||
@ -852,7 +869,8 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler);
|
||||
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler,
|
||||
shard_->GetSchema(shard_->PrimaryLabel()));
|
||||
|
||||
if (!result) {
|
||||
action_successful = false;
|
||||
|
@ -33,8 +33,9 @@ std::ostream &operator<<(std::ostream &in, const std::vector<T> &vector) {
|
||||
return in;
|
||||
}
|
||||
|
||||
template <typename K, typename V>
|
||||
std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
|
||||
namespace detail {
|
||||
template <typename T>
|
||||
std::ostream &MapImpl(std::ostream &in, const T &map) {
|
||||
in << "{";
|
||||
bool first = true;
|
||||
for (const auto &[a, b] : map) {
|
||||
@ -49,6 +50,17 @@ std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
|
||||
in << "}";
|
||||
return in;
|
||||
}
|
||||
} // namespace detail
|
||||
|
||||
template <typename K, typename V>
|
||||
std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
|
||||
return detail::MapImpl(in, map);
|
||||
}
|
||||
|
||||
template <typename K, typename V, typename THash, typename Cmp>
|
||||
std::ostream &operator<<(std::ostream &in, const std::unordered_map<K, V, THash, Cmp> &map) {
|
||||
return detail::MapImpl(in, map);
|
||||
}
|
||||
|
||||
template <typename K, typename V>
|
||||
std::ostream &operator<<(std::ostream &in, const std::unordered_map<K, V> &map) {
|
||||
|
@ -3,3 +3,8 @@ function(distributed_queries_e2e_python_files FILE_NAME)
|
||||
endfunction()
|
||||
|
||||
distributed_queries_e2e_python_files(distributed_queries.py)
|
||||
distributed_queries_e2e_python_files(unwind_collect.py)
|
||||
distributed_queries_e2e_python_files(order_by_and_limit.py)
|
||||
distributed_queries_e2e_python_files(distinct.py)
|
||||
distributed_queries_e2e_python_files(optional_match.py)
|
||||
distributed_queries_e2e_python_files(common.py)
|
||||
|
44
tests/e2e/distributed_queries/common.py
Normal file
44
tests/e2e/distributed_queries/common.py
Normal file
@ -0,0 +1,44 @@
|
||||
# Copyright 2022 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import typing
|
||||
import mgclient
|
||||
import sys
|
||||
import pytest
|
||||
import time
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def connection():
|
||||
connection = connect()
|
||||
return connection
|
||||
|
||||
|
||||
def connect(**kwargs) -> mgclient.Connection:
|
||||
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
|
||||
connection.autocommit = True
|
||||
return connection
|
||||
|
||||
|
||||
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
|
||||
cursor.execute(query, params)
|
||||
return cursor.fetchall()
|
||||
|
||||
|
||||
def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int):
|
||||
results = execute_and_fetch_all(cursor, query)
|
||||
return len(results) == n
|
||||
|
||||
|
||||
def wait_for_shard_manager_to_initialize():
|
||||
# The ShardManager in memgraph takes some time to initialize
|
||||
# the shards, thus we cannot just run the queries right away
|
||||
time.sleep(3)
|
38
tests/e2e/distributed_queries/distinct.py
Normal file
38
tests/e2e/distributed_queries/distinct.py
Normal file
@ -0,0 +1,38 @@
|
||||
# Copyright 2022 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import typing
|
||||
import mgclient
|
||||
import sys
|
||||
import pytest
|
||||
import time
|
||||
from common import *
|
||||
|
||||
|
||||
def test_distinct(connection):
|
||||
wait_for_shard_manager_to_initialize()
|
||||
cursor = connection.cursor()
|
||||
|
||||
assert has_n_result_row(cursor, "CREATE (n :label {property:0})", 0)
|
||||
assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0)
|
||||
assert has_n_result_row(cursor, "MATCH (n), (m) CREATE (n)-[:TO]->(m)", 0)
|
||||
assert has_n_result_row(cursor, "MATCH (n)-[r]->(m) RETURN r", 4)
|
||||
|
||||
results = execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN DISTINCT m")
|
||||
assert len(results) == 2
|
||||
for i, n in enumerate(results):
|
||||
n_props = n[0].properties
|
||||
assert len(n_props) == 1
|
||||
assert n_props["property"] == i
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
@ -14,34 +14,7 @@ import mgclient
|
||||
import sys
|
||||
import pytest
|
||||
import time
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def connection():
|
||||
connection = connect()
|
||||
return connection
|
||||
|
||||
|
||||
def connect(**kwargs) -> mgclient.Connection:
|
||||
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
|
||||
connection.autocommit = True
|
||||
return connection
|
||||
|
||||
|
||||
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
|
||||
cursor.execute(query, params)
|
||||
return cursor.fetchall()
|
||||
|
||||
|
||||
def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int):
|
||||
results = execute_and_fetch_all(cursor, query)
|
||||
return len(results) == n
|
||||
|
||||
|
||||
def wait_for_shard_manager_to_initialize():
|
||||
# The ShardManager in memgraph takes some time to initialize
|
||||
# the shards, thus we cannot just run the queries right away
|
||||
time.sleep(3)
|
||||
from common import *
|
||||
|
||||
|
||||
def test_vertex_creation_and_scanall(connection):
|
||||
@ -62,7 +35,7 @@ def test_vertex_creation_and_scanall(connection):
|
||||
assert len(results) == 9
|
||||
for (n, r, m) in results:
|
||||
n_props = n.properties
|
||||
assert len(n_props) == 0, "n is not expected to have properties, update the test!"
|
||||
assert len(n_props) == 1, "n is not expected to have properties, update the test!"
|
||||
assert len(n.labels) == 0, "n is not expected to have labels, update the test!"
|
||||
|
||||
assert r.type == "TO"
|
||||
|
37
tests/e2e/distributed_queries/optional_match.py
Normal file
37
tests/e2e/distributed_queries/optional_match.py
Normal file
@ -0,0 +1,37 @@
|
||||
# Copyright 2022 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import typing
|
||||
import mgclient
|
||||
import sys
|
||||
import pytest
|
||||
import time
|
||||
from common import *
|
||||
|
||||
|
||||
def test_optional_match(connection):
|
||||
wait_for_shard_manager_to_initialize()
|
||||
cursor = connection.cursor()
|
||||
|
||||
assert has_n_result_row(cursor, "CREATE (n :label {property:0})", 0)
|
||||
|
||||
results = execute_and_fetch_all(
|
||||
cursor, "MATCH (n:label) OPTIONAL MATCH (n:label)-[:TO]->(parent:label) RETURN parent"
|
||||
)
|
||||
assert len(results) == 1
|
||||
|
||||
assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0)
|
||||
assert has_n_result_row(cursor, "MATCH (n), (m) CREATE (n)-[:TO]->(m)", 0)
|
||||
assert has_n_result_row(cursor, "MATCH (n:label) OPTIONAL MATCH (n)-[r:TO]->(m:label) RETURN r", 4)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
44
tests/e2e/distributed_queries/order_by_and_limit.py
Normal file
44
tests/e2e/distributed_queries/order_by_and_limit.py
Normal file
@ -0,0 +1,44 @@
|
||||
# Copyright 2022 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import typing
|
||||
import mgclient
|
||||
import sys
|
||||
import pytest
|
||||
import time
|
||||
from common import *
|
||||
|
||||
|
||||
def test_order_by_and_limit(connection):
|
||||
wait_for_shard_manager_to_initialize()
|
||||
cursor = connection.cursor()
|
||||
|
||||
assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0)
|
||||
assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0)
|
||||
assert has_n_result_row(cursor, "CREATE (n :label {property:3})", 0)
|
||||
assert has_n_result_row(cursor, "CREATE (n :label {property:4})", 0)
|
||||
|
||||
results = execute_and_fetch_all(cursor, "MATCH (n) RETURN n ORDER BY n.property DESC")
|
||||
assert len(results) == 4
|
||||
i = 4
|
||||
for n in results:
|
||||
n_props = n[0].properties
|
||||
assert len(n_props) == 1
|
||||
assert n_props["property"] == i
|
||||
i = i - 1
|
||||
|
||||
result = execute_and_fetch_all(cursor, "MATCH (n) RETURN n ORDER BY n.property LIMIT 1")
|
||||
assert len(result) == 1
|
||||
assert result[0][0].properties["property"] == 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
34
tests/e2e/distributed_queries/unwind_collect.py
Normal file
34
tests/e2e/distributed_queries/unwind_collect.py
Normal file
@ -0,0 +1,34 @@
|
||||
# Copyright 2022 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import typing
|
||||
import mgclient
|
||||
import sys
|
||||
import pytest
|
||||
import time
|
||||
from common import *
|
||||
|
||||
|
||||
def test_collect_unwind(connection):
|
||||
wait_for_shard_manager_to_initialize()
|
||||
cursor = connection.cursor()
|
||||
|
||||
assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0)
|
||||
assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0)
|
||||
assert has_n_result_row(cursor, "CREATE (n :label {property:3})", 0)
|
||||
assert has_n_result_row(cursor, "CREATE (n :label {property:4})", 0)
|
||||
|
||||
assert has_n_result_row(cursor, "MATCH (n) WITH collect(n) AS result RETURN result", 1)
|
||||
assert has_n_result_row(cursor, "MATCH (n) WITH collect(n) AS nd UNWIND nd AS result RETURN result", 4)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
@ -11,3 +11,23 @@ workloads:
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["distributed_queries/distributed_queries.py"]
|
||||
<<: *template_cluster
|
||||
|
||||
- name: "Distributed unwind collect"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["distributed_queries/unwind_collect.py"]
|
||||
<<: *template_cluster
|
||||
|
||||
- name: "Distributed order by and limit"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["distributed_queries/order_by_and_limit.py"]
|
||||
<<: *template_cluster
|
||||
|
||||
- name: "Distributed distinct"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["distributed_queries/distinct.py"]
|
||||
<<: *template_cluster
|
||||
|
||||
- name: "Distributed optional match"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["distributed_queries/optional_match.py"]
|
||||
<<: *template_cluster
|
||||
|
@ -341,14 +341,14 @@ class AccessControl(Dataset):
|
||||
def _get_random_uuid(self, type):
|
||||
assert type in ["File", "Permission", "Identity"]
|
||||
|
||||
first_uuid = Dataset.get_size(self)["uuid_ranges"][type]["first_uuid"]
|
||||
last_uuid = Dataset.get_size(self)["uuid_ranges"][type]["last_uuid"]
|
||||
first_uuid = self.get_size()["uuid_ranges"][type]["first_uuid"]
|
||||
last_uuid = self.get_size()["uuid_ranges"][type]["last_uuid"]
|
||||
|
||||
random_value = random.randint(first_uuid, last_uuid)
|
||||
return random_value
|
||||
|
||||
def __init__(self, variant=None):
|
||||
Dataset.__init__(self, variant)
|
||||
super().__init__(self, variant)
|
||||
self.next_value_idx = Dataset.get_size(self)["vertices"] + 1
|
||||
|
||||
def benchmark__create__vertex(self):
|
||||
|
@ -12,7 +12,7 @@ PIP_DEPS=(
|
||||
"neo4j-driver==4.1.1"
|
||||
"parse==1.18.0"
|
||||
"parse-type==0.5.2"
|
||||
"pytest==6.2.3"
|
||||
"pytest==6.2.5"
|
||||
"pyyaml==5.4.1"
|
||||
"six==1.15.0"
|
||||
)
|
||||
@ -36,12 +36,12 @@ PYTHON_MINOR=$(python3 -c 'import sys; print(sys.version_info[:][1])')
|
||||
# NOTE (2021-11-15): PyPi doesn't contain pulsar-client for Python 3.9 so we have to use
|
||||
# our manually built wheel file. When they update the repository, pulsar-client can be
|
||||
# added as a regular PIP dependancy
|
||||
if [ $PYTHON_MINOR -lt 9 ]; then
|
||||
pip --timeout 1000 install "pulsar-client==2.8.1"
|
||||
else
|
||||
pip --timeout 1000 install https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pulsar_client-2.8.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl
|
||||
fi
|
||||
|
||||
#if [ $PYTHON_MINOR -lt 9 ]; then
|
||||
# pip --timeout 1000 install "pulsar-client==2.8.1"
|
||||
#else
|
||||
# pip --timeout 1000 install https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pulsar_client-2.8.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl
|
||||
#fi
|
||||
#
|
||||
for pkg in "${PIP_DEPS[@]}"; do
|
||||
pip --timeout 1000 install "$pkg"
|
||||
done
|
||||
|
Loading…
Reference in New Issue
Block a user