Merge branch 'project-pineapples' into T1185-MG-replace-skip-list
This commit is contained in:
commit
c39f264684
@ -42,4 +42,19 @@ class Frame {
|
||||
utils::pmr::vector<TypedValue> elems_;
|
||||
};
|
||||
|
||||
template <typename TypedValue>
|
||||
class FrameWithValidity final : public Frame<TypedValue> {
|
||||
public:
|
||||
explicit FrameWithValidity(int64_t size) : Frame<TypedValue>(size), is_valid_(false) {}
|
||||
|
||||
FrameWithValidity(int64_t size, utils::MemoryResource *memory) : Frame<TypedValue>(size, memory), is_valid_(false) {}
|
||||
|
||||
bool IsValid() const noexcept { return is_valid_; }
|
||||
void MakeValid() noexcept { is_valid_ = true; }
|
||||
void MakeInvalid() noexcept { is_valid_ = false; }
|
||||
|
||||
private:
|
||||
bool is_valid_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::expr
|
||||
|
@ -23,7 +23,8 @@ set(mg_query_v2_sources
|
||||
plan/variable_start_planner.cpp
|
||||
serialization/property_value.cpp
|
||||
bindings/typed_value.cpp
|
||||
accessors.cpp)
|
||||
accessors.cpp
|
||||
multiframe.cpp)
|
||||
|
||||
find_package(Boost REQUIRED)
|
||||
|
||||
|
@ -13,9 +13,10 @@
|
||||
|
||||
#include "query/v2/bindings/bindings.hpp"
|
||||
|
||||
#include "query/v2/bindings/typed_value.hpp"
|
||||
#include "expr/interpret/frame.hpp"
|
||||
#include "query/v2/bindings/typed_value.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
using Frame = memgraph::expr::Frame<TypedValue>;
|
||||
} // namespace memgraph::query::v2
|
||||
using FrameWithValidity = memgraph::expr::FrameWithValidity<TypedValue>;
|
||||
} // namespace memgraph::query::v2
|
||||
|
@ -41,6 +41,7 @@
|
||||
#include "query/v2/frontend/ast/ast.hpp"
|
||||
#include "query/v2/frontend/semantic/required_privileges.hpp"
|
||||
#include "query/v2/metadata.hpp"
|
||||
#include "query/v2/multiframe.hpp"
|
||||
#include "query/v2/plan/planner.hpp"
|
||||
#include "query/v2/plan/profile.hpp"
|
||||
#include "query/v2/plan/vertex_count_cache.hpp"
|
||||
@ -655,11 +656,15 @@ struct PullPlan {
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> Pull(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary);
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> PullMultiple(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary);
|
||||
|
||||
private:
|
||||
std::shared_ptr<CachedPlan> plan_ = nullptr;
|
||||
plan::UniqueCursorPtr cursor_ = nullptr;
|
||||
expr::Frame<TypedValue> frame_;
|
||||
expr::FrameWithValidity<TypedValue> frame_;
|
||||
MultiFrame multi_frame_;
|
||||
ExecutionContext ctx_;
|
||||
std::optional<size_t> memory_limit_;
|
||||
|
||||
@ -683,6 +688,7 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
|
||||
: plan_(plan),
|
||||
cursor_(plan->plan().MakeCursor(execution_memory)),
|
||||
frame_(plan->symbol_table().max_position(), execution_memory),
|
||||
multi_frame_(plan->symbol_table().max_position(), kNumberOfFramesInMultiframe, execution_memory),
|
||||
memory_limit_(memory_limit) {
|
||||
ctx_.db_accessor = dba;
|
||||
ctx_.symbol_table = plan->symbol_table();
|
||||
@ -699,9 +705,116 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
|
||||
ctx_.edge_ids_alloc = &interpreter_context->edge_ids_alloc;
|
||||
}
|
||||
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary) {
|
||||
// Set up temporary memory for a single Pull. Initial memory comes from the
|
||||
// stack. 256 KiB should fit on the stack and should be more than enough for a
|
||||
// single `Pull`.
|
||||
MG_ASSERT(!n.has_value(), "should pull all!");
|
||||
static constexpr size_t stack_size = 256UL * 1024UL;
|
||||
char stack_data[stack_size];
|
||||
utils::ResourceWithOutOfMemoryException resource_with_exception;
|
||||
utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception);
|
||||
// We can throw on every query because a simple queries for deleting will use only
|
||||
// the stack allocated buffer.
|
||||
// Also, we want to throw only when the query engine requests more memory and not the storage
|
||||
// so we add the exception to the allocator.
|
||||
// TODO (mferencevic): Tune the parameters accordingly.
|
||||
utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
|
||||
std::optional<utils::LimitedMemoryResource> maybe_limited_resource;
|
||||
|
||||
if (memory_limit_) {
|
||||
maybe_limited_resource.emplace(&pool_memory, *memory_limit_);
|
||||
ctx_.evaluation_context.memory = &*maybe_limited_resource;
|
||||
} else {
|
||||
ctx_.evaluation_context.memory = &pool_memory;
|
||||
}
|
||||
|
||||
// Returns true if a result was pulled.
|
||||
const auto pull_result = [&]() -> bool {
|
||||
cursor_->PullMultiple(multi_frame_, ctx_);
|
||||
return multi_frame_.HasValidFrame();
|
||||
};
|
||||
|
||||
const auto stream_values = [&output_symbols, &stream](const Frame &frame) {
|
||||
// TODO: The streamed values should also probably use the above memory.
|
||||
std::vector<TypedValue> values;
|
||||
values.reserve(output_symbols.size());
|
||||
|
||||
for (const auto &symbol : output_symbols) {
|
||||
values.emplace_back(frame[symbol]);
|
||||
}
|
||||
|
||||
stream->Result(values);
|
||||
};
|
||||
|
||||
// Get the execution time of all possible result pulls and streams.
|
||||
utils::Timer timer;
|
||||
|
||||
int i = 0;
|
||||
if (has_unsent_results_ && !output_symbols.empty()) {
|
||||
// stream unsent results from previous pull
|
||||
|
||||
auto iterator_for_valid_frame_only = multi_frame_.GetValidFramesReader();
|
||||
for (const auto &frame : iterator_for_valid_frame_only) {
|
||||
stream_values(frame);
|
||||
++i;
|
||||
}
|
||||
multi_frame_.MakeAllFramesInvalid();
|
||||
}
|
||||
|
||||
for (; !n || i < n;) {
|
||||
if (!pull_result()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!output_symbols.empty()) {
|
||||
auto iterator_for_valid_frame_only = multi_frame_.GetValidFramesReader();
|
||||
for (const auto &frame : iterator_for_valid_frame_only) {
|
||||
stream_values(frame);
|
||||
++i;
|
||||
}
|
||||
}
|
||||
multi_frame_.MakeAllFramesInvalid();
|
||||
}
|
||||
|
||||
// If we finished because we streamed the requested n results,
|
||||
// we try to pull the next result to see if there is more.
|
||||
// If there is additional result, we leave the pulled result in the frame
|
||||
// and set the flag to true.
|
||||
has_unsent_results_ = i == n && pull_result();
|
||||
|
||||
execution_time_ += timer.Elapsed();
|
||||
|
||||
if (has_unsent_results_) {
|
||||
return std::nullopt;
|
||||
}
|
||||
summary->insert_or_assign("plan_execution_time", execution_time_.count());
|
||||
// We are finished with pulling all the data, therefore we can send any
|
||||
// metadata about the results i.e. notifications and statistics
|
||||
const bool is_any_counter_set =
|
||||
std::any_of(ctx_.execution_stats.counters.begin(), ctx_.execution_stats.counters.end(),
|
||||
[](const auto &counter) { return counter > 0; });
|
||||
if (is_any_counter_set) {
|
||||
std::map<std::string, TypedValue> stats;
|
||||
for (size_t i = 0; i < ctx_.execution_stats.counters.size(); ++i) {
|
||||
stats.emplace(ExecutionStatsKeyToString(ExecutionStats::Key(i)), ctx_.execution_stats.counters[i]);
|
||||
}
|
||||
summary->insert_or_assign("stats", std::move(stats));
|
||||
}
|
||||
cursor_->Shutdown();
|
||||
ctx_.profile_execution_time = execution_time_;
|
||||
return GetStatsWithTotalTime(ctx_);
|
||||
}
|
||||
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary) {
|
||||
auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple
|
||||
if (should_pull_multiple) {
|
||||
return PullMultiple(stream, n, output_symbols, summary);
|
||||
}
|
||||
// Set up temporary memory for a single Pull. Initial memory comes from the
|
||||
// stack. 256 KiB should fit on the stack and should be more than enough for a
|
||||
// single `Pull`.
|
||||
|
127
src/query/v2/multiframe.cpp
Normal file
127
src/query/v2/multiframe.cpp
Normal file
@ -0,0 +1,127 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "query/v2/multiframe.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <iterator>
|
||||
|
||||
#include "query/v2/bindings/frame.hpp"
|
||||
#include "utils/pmr/vector.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
|
||||
static_assert(std::forward_iterator<ValidFramesReader::Iterator>);
|
||||
static_assert(std::forward_iterator<ValidFramesModifier::Iterator>);
|
||||
static_assert(std::forward_iterator<ValidFramesConsumer::Iterator>);
|
||||
static_assert(std::forward_iterator<InvalidFramesPopulator::Iterator>);
|
||||
|
||||
MultiFrame::MultiFrame(int64_t size_of_frame, size_t number_of_frames, utils::MemoryResource *execution_memory)
|
||||
: frames_(utils::pmr::vector<FrameWithValidity>(
|
||||
number_of_frames, FrameWithValidity(size_of_frame, execution_memory), execution_memory)) {
|
||||
MG_ASSERT(number_of_frames > 0);
|
||||
}
|
||||
|
||||
MultiFrame::MultiFrame(const MultiFrame &other) : frames_{other.frames_} {}
|
||||
|
||||
// NOLINTNEXTLINE (bugprone-exception-escape)
|
||||
MultiFrame::MultiFrame(MultiFrame &&other) noexcept : frames_(std::move(other.frames_)) {}
|
||||
|
||||
FrameWithValidity &MultiFrame::GetFirstFrame() {
|
||||
MG_ASSERT(!frames_.empty());
|
||||
return frames_.front();
|
||||
}
|
||||
|
||||
void MultiFrame::MakeAllFramesInvalid() noexcept {
|
||||
std::for_each(frames_.begin(), frames_.end(), [](auto &frame) { frame.MakeInvalid(); });
|
||||
}
|
||||
|
||||
bool MultiFrame::HasValidFrame() const noexcept {
|
||||
return std::any_of(frames_.begin(), frames_.end(), [](auto &frame) { return frame.IsValid(); });
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE (bugprone-exception-escape)
|
||||
void MultiFrame::DefragmentValidFrames() noexcept {
|
||||
/*
|
||||
from: https://en.cppreference.com/w/cpp/algorithm/remove
|
||||
"Removing is done by shifting (by means of copy assignment (until C++11)move assignment (since C++11)) the elements
|
||||
in the range in such a way that the elements that are not to be removed appear in the beginning of the range.
|
||||
Relative order of the elements that remain is preserved and the physical size of the container is unchanged."
|
||||
*/
|
||||
|
||||
// NOLINTNEXTLINE (bugprone-unused-return-value)
|
||||
std::remove_if(frames_.begin(), frames_.end(), [](auto &frame) { return !frame.IsValid(); });
|
||||
}
|
||||
|
||||
ValidFramesReader MultiFrame::GetValidFramesReader() { return ValidFramesReader{*this}; }
|
||||
|
||||
ValidFramesModifier MultiFrame::GetValidFramesModifier() { return ValidFramesModifier{*this}; }
|
||||
|
||||
ValidFramesConsumer MultiFrame::GetValidFramesConsumer() { return ValidFramesConsumer{*this}; }
|
||||
|
||||
InvalidFramesPopulator MultiFrame::GetInvalidFramesPopulator() { return InvalidFramesPopulator{*this}; }
|
||||
|
||||
ValidFramesReader::ValidFramesReader(MultiFrame &multiframe) : multiframe_(multiframe) {
|
||||
/*
|
||||
From: https://en.cppreference.com/w/cpp/algorithm/find
|
||||
Returns an iterator to the first element in the range [first, last) that satisfies specific criteria:
|
||||
find_if searches for an element for which predicate p returns true
|
||||
Return value
|
||||
Iterator to the first element satisfying the condition or last if no such element is found.
|
||||
|
||||
-> this is what we want. We want the "after" last valid frame (weather this is vector::end or and invalid frame).
|
||||
*/
|
||||
auto it = std::find_if(multiframe.frames_.begin(), multiframe.frames_.end(),
|
||||
[](const auto &frame) { return !frame.IsValid(); });
|
||||
after_last_valid_frame_ = multiframe_.frames_.data() + std::distance(multiframe.frames_.begin(), it);
|
||||
}
|
||||
|
||||
ValidFramesReader::Iterator ValidFramesReader::begin() { return Iterator{&multiframe_.frames_[0]}; }
|
||||
ValidFramesReader::Iterator ValidFramesReader::end() { return Iterator{after_last_valid_frame_}; }
|
||||
|
||||
ValidFramesModifier::ValidFramesModifier(MultiFrame &multiframe) : multiframe_(multiframe) {}
|
||||
|
||||
ValidFramesModifier::Iterator ValidFramesModifier::begin() { return Iterator{&multiframe_.frames_[0], *this}; }
|
||||
ValidFramesModifier::Iterator ValidFramesModifier::end() {
|
||||
return Iterator{multiframe_.frames_.data() + multiframe_.frames_.size(), *this};
|
||||
}
|
||||
|
||||
ValidFramesConsumer::ValidFramesConsumer(MultiFrame &multiframe) : multiframe_(multiframe) {}
|
||||
|
||||
// NOLINTNEXTLINE (bugprone-exception-escape)
|
||||
ValidFramesConsumer::~ValidFramesConsumer() noexcept {
|
||||
// TODO Possible optimisation: only DefragmentValidFrames if one frame has been invalidated? Only if does not
|
||||
// cost too much to store it
|
||||
multiframe_.DefragmentValidFrames();
|
||||
}
|
||||
|
||||
ValidFramesConsumer::Iterator ValidFramesConsumer::begin() { return Iterator{&multiframe_.frames_[0], *this}; }
|
||||
|
||||
ValidFramesConsumer::Iterator ValidFramesConsumer::end() {
|
||||
return Iterator{multiframe_.frames_.data() + multiframe_.frames_.size(), *this};
|
||||
}
|
||||
|
||||
InvalidFramesPopulator::InvalidFramesPopulator(MultiFrame &multiframe) : multiframe_(multiframe) {}
|
||||
|
||||
InvalidFramesPopulator::Iterator InvalidFramesPopulator::begin() {
|
||||
for (auto &frame : multiframe_.frames_) {
|
||||
if (!frame.IsValid()) {
|
||||
return Iterator{&frame};
|
||||
}
|
||||
}
|
||||
return end();
|
||||
}
|
||||
|
||||
InvalidFramesPopulator::Iterator InvalidFramesPopulator::end() {
|
||||
return Iterator{multiframe_.frames_.data() + multiframe_.frames_.size()};
|
||||
}
|
||||
|
||||
} // namespace memgraph::query::v2
|
302
src/query/v2/multiframe.hpp
Normal file
302
src/query/v2/multiframe.hpp
Normal file
@ -0,0 +1,302 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <iterator>
|
||||
|
||||
#include "query/v2/bindings/frame.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
constexpr uint64_t kNumberOfFramesInMultiframe = 1000; // TODO have it configurable
|
||||
|
||||
class ValidFramesConsumer;
|
||||
class ValidFramesModifier;
|
||||
class ValidFramesReader;
|
||||
class InvalidFramesPopulator;
|
||||
|
||||
class MultiFrame {
|
||||
public:
|
||||
friend class ValidFramesConsumer;
|
||||
friend class ValidFramesModifier;
|
||||
friend class ValidFramesReader;
|
||||
friend class InvalidFramesPopulator;
|
||||
|
||||
MultiFrame(int64_t size_of_frame, size_t number_of_frames, utils::MemoryResource *execution_memory);
|
||||
~MultiFrame() = default;
|
||||
|
||||
MultiFrame(const MultiFrame &other);
|
||||
MultiFrame(MultiFrame &&other) noexcept;
|
||||
MultiFrame &operator=(const MultiFrame &other) = delete;
|
||||
MultiFrame &operator=(MultiFrame &&other) noexcept = delete;
|
||||
|
||||
/*
|
||||
* Returns a object on which one can iterate in a for-loop. By doing so, you will only get Frames that are in a valid
|
||||
* state in the MultiFrame.
|
||||
* Iteration goes in a deterministic order.
|
||||
* One can't modify the validity of the Frame nor its content with this implementation.
|
||||
*/
|
||||
ValidFramesReader GetValidFramesReader();
|
||||
|
||||
/*
|
||||
* Returns a object on which one can iterate in a for-loop. By doing so, you will only get Frames that are in a valid
|
||||
* state in the MultiFrame.
|
||||
* Iteration goes in a deterministic order.
|
||||
* One can't modify the validity of the Frame with this implementation. One can modify its content.
|
||||
*/
|
||||
ValidFramesModifier GetValidFramesModifier();
|
||||
|
||||
/*
|
||||
* Returns a object on which one can iterate in a for-loop. By doing so, you will only get Frames that are in a valid
|
||||
* state in the MultiFrame.
|
||||
* Iteration goes in a deterministic order.
|
||||
* One can modify the validity of the Frame with this implementation.
|
||||
* If you do not plan to modify the validity of the Frames, use GetValidFramesReader/GetValidFramesModifer instead as
|
||||
* this is faster.
|
||||
*/
|
||||
ValidFramesConsumer GetValidFramesConsumer();
|
||||
|
||||
/*
|
||||
* Returns a object on which one can iterate in a for-loop. By doing so, you will only get Frames that are in an
|
||||
* invalid state in the MultiFrame. Iteration goes in a deterministic order. One can modify the validity of
|
||||
* the Frame with this implementation.
|
||||
*/
|
||||
InvalidFramesPopulator GetInvalidFramesPopulator();
|
||||
|
||||
/**
|
||||
* Return the first Frame of the MultiFrame. This is only meant to be used in very specific cases. Please consider
|
||||
* using the iterators instead.
|
||||
* The Frame can be valid or invalid.
|
||||
*/
|
||||
FrameWithValidity &GetFirstFrame();
|
||||
|
||||
void MakeAllFramesInvalid() noexcept;
|
||||
|
||||
bool HasValidFrame() const noexcept;
|
||||
|
||||
inline utils::MemoryResource *GetMemoryResource() { return frames_[0].GetMemoryResource(); }
|
||||
|
||||
private:
|
||||
void DefragmentValidFrames() noexcept;
|
||||
|
||||
utils::pmr::vector<FrameWithValidity> frames_;
|
||||
};
|
||||
|
||||
class ValidFramesReader {
|
||||
public:
|
||||
explicit ValidFramesReader(MultiFrame &multiframe);
|
||||
|
||||
~ValidFramesReader() = default;
|
||||
ValidFramesReader(const ValidFramesReader &other) = delete;
|
||||
ValidFramesReader(ValidFramesReader &&other) noexcept = delete;
|
||||
ValidFramesReader &operator=(const ValidFramesReader &other) = delete;
|
||||
ValidFramesReader &operator=(ValidFramesReader &&other) noexcept = delete;
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using value_type = const Frame;
|
||||
using pointer = value_type *;
|
||||
using reference = const Frame &;
|
||||
|
||||
Iterator() = default;
|
||||
explicit Iterator(FrameWithValidity *ptr) : ptr_(ptr) {}
|
||||
|
||||
reference operator*() const { return *ptr_; }
|
||||
pointer operator->() { return ptr_; }
|
||||
|
||||
Iterator &operator++() {
|
||||
ptr_++;
|
||||
return *this;
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(cert-dcl21-cpp)
|
||||
Iterator operator++(int) {
|
||||
auto old = *this;
|
||||
ptr_++;
|
||||
return old;
|
||||
}
|
||||
|
||||
friend bool operator==(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ == rhs.ptr_; };
|
||||
friend bool operator!=(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ != rhs.ptr_; };
|
||||
|
||||
private:
|
||||
FrameWithValidity *ptr_{nullptr};
|
||||
};
|
||||
|
||||
Iterator begin();
|
||||
Iterator end();
|
||||
|
||||
private:
|
||||
FrameWithValidity *after_last_valid_frame_;
|
||||
MultiFrame &multiframe_;
|
||||
};
|
||||
|
||||
class ValidFramesModifier {
|
||||
public:
|
||||
explicit ValidFramesModifier(MultiFrame &multiframe);
|
||||
|
||||
~ValidFramesModifier() = default;
|
||||
ValidFramesModifier(const ValidFramesModifier &other) = delete;
|
||||
ValidFramesModifier(ValidFramesModifier &&other) noexcept = delete;
|
||||
ValidFramesModifier &operator=(const ValidFramesModifier &other) = delete;
|
||||
ValidFramesModifier &operator=(ValidFramesModifier &&other) noexcept = delete;
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using value_type = Frame;
|
||||
using pointer = value_type *;
|
||||
using reference = Frame &;
|
||||
|
||||
Iterator() = default;
|
||||
Iterator(FrameWithValidity *ptr, ValidFramesModifier &iterator_wrapper)
|
||||
: ptr_(ptr), iterator_wrapper_(&iterator_wrapper) {}
|
||||
|
||||
reference operator*() const { return *ptr_; }
|
||||
pointer operator->() { return ptr_; }
|
||||
|
||||
// Prefix increment
|
||||
Iterator &operator++() {
|
||||
do {
|
||||
ptr_++;
|
||||
} while (*this != iterator_wrapper_->end() && ptr_->IsValid());
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(cert-dcl21-cpp)
|
||||
Iterator operator++(int) {
|
||||
auto old = *this;
|
||||
++*this;
|
||||
return old;
|
||||
}
|
||||
|
||||
friend bool operator==(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ == rhs.ptr_; };
|
||||
friend bool operator!=(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ != rhs.ptr_; };
|
||||
|
||||
private:
|
||||
FrameWithValidity *ptr_{nullptr};
|
||||
ValidFramesModifier *iterator_wrapper_{nullptr};
|
||||
};
|
||||
|
||||
Iterator begin();
|
||||
Iterator end();
|
||||
|
||||
private:
|
||||
MultiFrame &multiframe_;
|
||||
};
|
||||
|
||||
class ValidFramesConsumer {
|
||||
public:
|
||||
explicit ValidFramesConsumer(MultiFrame &multiframe);
|
||||
|
||||
~ValidFramesConsumer() noexcept;
|
||||
ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
|
||||
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete;
|
||||
ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
|
||||
ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using value_type = FrameWithValidity;
|
||||
using pointer = value_type *;
|
||||
using reference = FrameWithValidity &;
|
||||
|
||||
Iterator() = default;
|
||||
Iterator(FrameWithValidity *ptr, ValidFramesConsumer &iterator_wrapper)
|
||||
: ptr_(ptr), iterator_wrapper_(&iterator_wrapper) {}
|
||||
|
||||
reference operator*() const { return *ptr_; }
|
||||
pointer operator->() { return ptr_; }
|
||||
|
||||
Iterator &operator++() {
|
||||
do {
|
||||
ptr_++;
|
||||
} while (*this != iterator_wrapper_->end() && !ptr_->IsValid());
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(cert-dcl21-cpp)
|
||||
Iterator operator++(int) {
|
||||
auto old = *this;
|
||||
++*this;
|
||||
return old;
|
||||
}
|
||||
|
||||
friend bool operator==(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ == rhs.ptr_; };
|
||||
friend bool operator!=(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ != rhs.ptr_; };
|
||||
|
||||
private:
|
||||
FrameWithValidity *ptr_{nullptr};
|
||||
ValidFramesConsumer *iterator_wrapper_{nullptr};
|
||||
};
|
||||
|
||||
Iterator begin();
|
||||
Iterator end();
|
||||
|
||||
private:
|
||||
MultiFrame &multiframe_;
|
||||
};
|
||||
|
||||
class InvalidFramesPopulator {
|
||||
public:
|
||||
explicit InvalidFramesPopulator(MultiFrame &multiframe);
|
||||
~InvalidFramesPopulator() = default;
|
||||
|
||||
InvalidFramesPopulator(const InvalidFramesPopulator &other) = delete;
|
||||
InvalidFramesPopulator(InvalidFramesPopulator &&other) noexcept = delete;
|
||||
InvalidFramesPopulator &operator=(const InvalidFramesPopulator &other) = delete;
|
||||
InvalidFramesPopulator &operator=(InvalidFramesPopulator &&other) noexcept = delete;
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using value_type = FrameWithValidity;
|
||||
using pointer = value_type *;
|
||||
using reference = FrameWithValidity &;
|
||||
|
||||
Iterator() = default;
|
||||
explicit Iterator(FrameWithValidity *ptr) : ptr_(ptr) {}
|
||||
|
||||
reference operator*() const { return *ptr_; }
|
||||
pointer operator->() { return ptr_; }
|
||||
|
||||
Iterator &operator++() {
|
||||
ptr_->MakeValid();
|
||||
ptr_++;
|
||||
return *this;
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(cert-dcl21-cpp)
|
||||
Iterator operator++(int) {
|
||||
auto old = *this;
|
||||
++ptr_;
|
||||
return old;
|
||||
}
|
||||
|
||||
friend bool operator==(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ == rhs.ptr_; };
|
||||
friend bool operator!=(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ != rhs.ptr_; };
|
||||
|
||||
private:
|
||||
FrameWithValidity *ptr_{nullptr};
|
||||
};
|
||||
|
||||
Iterator begin();
|
||||
Iterator end();
|
||||
|
||||
private:
|
||||
MultiFrame &multiframe_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::query::v2
|
@ -264,6 +264,16 @@ bool Once::OnceCursor::Pull(Frame &, ExecutionContext &context) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Once::OnceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) {
|
||||
SCOPED_PROFILE_OP("OnceMF");
|
||||
|
||||
if (!did_pull_) {
|
||||
auto &first_frame = multi_frame.GetFirstFrame();
|
||||
first_frame.MakeValid();
|
||||
did_pull_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
UniqueCursorPtr Once::MakeCursor(utils::MemoryResource *mem) const {
|
||||
EventCounter::IncrementCounter(EventCounter::OnceOperator);
|
||||
|
||||
@ -748,6 +758,23 @@ bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Produce::ProduceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) {
|
||||
SCOPED_PROFILE_OP("ProduceMF");
|
||||
|
||||
input_cursor_->PullMultiple(multi_frame, context);
|
||||
|
||||
auto iterator_for_valid_frame_only = multi_frame.GetValidFramesModifier();
|
||||
for (auto &frame : iterator_for_valid_frame_only) {
|
||||
// Produce should always yield the latest results.
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
|
||||
storage::v3::View::NEW);
|
||||
|
||||
for (auto *named_expr : self_.named_expressions_) {
|
||||
named_expr->Accept(evaluator);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void Produce::ProduceCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void Produce::ProduceCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
@ -28,6 +28,7 @@
|
||||
#include "query/v2/bindings/typed_value.hpp"
|
||||
#include "query/v2/bindings/frame.hpp"
|
||||
#include "query/v2/bindings/symbol_table.hpp"
|
||||
#include "query/v2/multiframe.hpp"
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "utils/bound.hpp"
|
||||
#include "utils/fnv.hpp"
|
||||
@ -71,6 +72,8 @@ class Cursor {
|
||||
/// @throws QueryRuntimeException if something went wrong with execution
|
||||
virtual bool Pull(Frame &, ExecutionContext &) = 0;
|
||||
|
||||
virtual void PullMultiple(MultiFrame &, ExecutionContext &) { LOG_FATAL("PullMultipleIsNotImplemented"); }
|
||||
|
||||
/// Resets the Cursor to its initial state.
|
||||
virtual void Reset() = 0;
|
||||
|
||||
@ -332,6 +335,7 @@ and false on every following Pull.")
|
||||
class OnceCursor : public Cursor {
|
||||
public:
|
||||
OnceCursor() {}
|
||||
void PullMultiple(MultiFrame &, ExecutionContext &) override;
|
||||
bool Pull(Frame &, ExecutionContext &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
@ -1207,6 +1211,7 @@ RETURN clause) the Produce's pull succeeds exactly once.")
|
||||
public:
|
||||
ProduceCursor(const Produce &, utils::MemoryResource *);
|
||||
bool Pull(Frame &, ExecutionContext &) override;
|
||||
void PullMultiple(MultiFrame &, ExecutionContext &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <deque>
|
||||
#include <iostream>
|
||||
@ -112,6 +113,7 @@ class RequestRouterInterface {
|
||||
virtual std::vector<msgs::CreateVerticesResponse> CreateVertices(std::vector<msgs::NewVertex> new_vertices) = 0;
|
||||
virtual std::vector<msgs::ExpandOneResultRow> ExpandOne(msgs::ExpandOneRequest request) = 0;
|
||||
virtual std::vector<msgs::CreateExpandResponse> CreateExpand(std::vector<msgs::NewExpand> new_edges) = 0;
|
||||
virtual std::vector<msgs::GetPropertiesResultRow> GetProperties(msgs::GetPropertiesRequest request) = 0;
|
||||
|
||||
virtual storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const = 0;
|
||||
virtual storage::v3::PropertyId NameToProperty(const std::string &name) const = 0;
|
||||
@ -367,6 +369,28 @@ class RequestRouter : public RequestRouterInterface {
|
||||
return result_rows;
|
||||
}
|
||||
|
||||
std::vector<msgs::GetPropertiesResultRow> GetProperties(msgs::GetPropertiesRequest requests) override {
|
||||
ExecutionState<msgs::GetPropertiesRequest> state = {};
|
||||
InitializeExecutionState(state, std::move(requests));
|
||||
for (auto &request : state.requests) {
|
||||
auto &storage_client = GetStorageClientForShard(request.shard);
|
||||
msgs::ReadRequests req = request.request;
|
||||
request.async_request_token = storage_client.SendAsyncReadRequest(req);
|
||||
}
|
||||
|
||||
std::vector<msgs::GetPropertiesResponse> responses;
|
||||
do {
|
||||
DriveReadResponses(state, responses);
|
||||
} while (!state.requests.empty());
|
||||
|
||||
std::vector<msgs::GetPropertiesResultRow> result;
|
||||
for (auto &res : responses) {
|
||||
std::move(res.result_row.begin(), res.result_row.end(), std::back_inserter(result));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
std::optional<storage::v3::PropertyId> MaybeNameToProperty(const std::string &name) const override {
|
||||
return shards_map_.GetPropertyId(name);
|
||||
}
|
||||
@ -503,6 +527,44 @@ class RequestRouter : public RequestRouterInterface {
|
||||
}
|
||||
}
|
||||
|
||||
void InitializeExecutionState(ExecutionState<msgs::GetPropertiesRequest> &state, msgs::GetPropertiesRequest request) {
|
||||
std::map<Shard, msgs::GetPropertiesRequest> per_shard_request_table;
|
||||
auto top_level_rqst_template = request;
|
||||
top_level_rqst_template.transaction_id = transaction_id_;
|
||||
top_level_rqst_template.vertex_ids.clear();
|
||||
top_level_rqst_template.vertices_and_edges.clear();
|
||||
|
||||
state.transaction_id = transaction_id_;
|
||||
|
||||
for (auto &vertex : request.vertex_ids) {
|
||||
auto shard =
|
||||
shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second));
|
||||
if (!per_shard_request_table.contains(shard)) {
|
||||
per_shard_request_table.insert(std::pair(shard, top_level_rqst_template));
|
||||
}
|
||||
per_shard_request_table[shard].vertex_ids.emplace_back(std::move(vertex));
|
||||
}
|
||||
|
||||
for (auto &[vertex, maybe_edge] : request.vertices_and_edges) {
|
||||
auto shard =
|
||||
shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second));
|
||||
if (!per_shard_request_table.contains(shard)) {
|
||||
per_shard_request_table.insert(std::pair(shard, top_level_rqst_template));
|
||||
}
|
||||
per_shard_request_table[shard].vertices_and_edges.emplace_back(std::move(vertex), maybe_edge);
|
||||
}
|
||||
|
||||
for (auto &[shard, rqst] : per_shard_request_table) {
|
||||
ShardRequestState<msgs::GetPropertiesRequest> shard_request_state{
|
||||
.shard = shard,
|
||||
.request = std::move(rqst),
|
||||
.async_request_token = std::nullopt,
|
||||
};
|
||||
|
||||
state.requests.emplace_back(std::move(shard_request_state));
|
||||
}
|
||||
}
|
||||
|
||||
StorageClient &GetStorageClientForShard(Shard shard) {
|
||||
if (!storage_cli_manager_.Exists(shard)) {
|
||||
AddStorageClientToManager(shard);
|
||||
|
@ -327,10 +327,6 @@ struct Expression {
|
||||
std::string expression;
|
||||
};
|
||||
|
||||
struct Filter {
|
||||
std::string filter_expression;
|
||||
};
|
||||
|
||||
enum class OrderingDirection { ASCENDING = 1, DESCENDING = 2 };
|
||||
|
||||
struct OrderBy {
|
||||
@ -372,21 +368,32 @@ struct ScanVerticesResponse {
|
||||
std::vector<ScanResultRow> results;
|
||||
};
|
||||
|
||||
using VertexOrEdgeIds = std::variant<VertexId, EdgeId>;
|
||||
|
||||
struct GetPropertiesRequest {
|
||||
Hlc transaction_id;
|
||||
// Shouldn't contain mixed vertex and edge ids
|
||||
VertexOrEdgeIds vertex_or_edge_ids;
|
||||
std::vector<PropertyId> property_ids;
|
||||
std::vector<Expression> expressions;
|
||||
bool only_unique = false;
|
||||
std::optional<std::vector<OrderBy>> order_by;
|
||||
std::vector<VertexId> vertex_ids;
|
||||
std::vector<std::pair<VertexId, EdgeId>> vertices_and_edges;
|
||||
|
||||
std::optional<std::vector<PropertyId>> property_ids;
|
||||
std::vector<std::string> expressions;
|
||||
|
||||
std::vector<OrderBy> order_by;
|
||||
std::optional<size_t> limit;
|
||||
std::optional<Filter> filter;
|
||||
|
||||
// Return only the properties of the vertices or edges that the filter predicate
|
||||
// evaluates to true
|
||||
std::optional<std::string> filter;
|
||||
};
|
||||
|
||||
struct GetPropertiesResultRow {
|
||||
VertexId vertex;
|
||||
std::optional<EdgeId> edge;
|
||||
|
||||
std::vector<std::pair<PropertyId, Value>> props;
|
||||
std::vector<Value> evaluated_expressions;
|
||||
};
|
||||
|
||||
struct GetPropertiesResponse {
|
||||
std::vector<GetPropertiesResultRow> result_row;
|
||||
std::optional<ShardError> error;
|
||||
};
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
#include "storage/v3/request_helper.hpp"
|
||||
|
||||
#include <iterator>
|
||||
#include <vector>
|
||||
|
||||
#include "storage/v3/bindings/db_accessor.hpp"
|
||||
@ -220,30 +221,39 @@ std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexA
|
||||
return evaluated_expressions;
|
||||
}
|
||||
|
||||
std::vector<TypedValue> EvaluateEdgeExpressions(DbAccessor &dba, const VertexAccessor &v_acc, const EdgeAccessor &e_acc,
|
||||
const std::vector<std::string> &expressions) {
|
||||
std::vector<TypedValue> evaluated_expressions;
|
||||
evaluated_expressions.reserve(expressions.size());
|
||||
|
||||
std::transform(expressions.begin(), expressions.end(), std::back_inserter(evaluated_expressions),
|
||||
[&dba, &v_acc, &e_acc](const auto &expression) {
|
||||
return ComputeExpression(dba, v_acc, e_acc, expression, expr::identifier_node_symbol,
|
||||
expr::identifier_edge_symbol);
|
||||
});
|
||||
|
||||
return evaluated_expressions;
|
||||
}
|
||||
|
||||
ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view,
|
||||
const Schemas::Schema &schema) {
|
||||
std::map<PropertyId, Value> ret;
|
||||
auto props = acc.Properties(view);
|
||||
if (props.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get vertex properties.");
|
||||
return props.GetError();
|
||||
auto ret = impl::CollectAllPropertiesImpl<VertexAccessor>(acc, view);
|
||||
if (ret.HasError()) {
|
||||
return ret.GetError();
|
||||
}
|
||||
|
||||
auto &properties = props.GetValue();
|
||||
std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()),
|
||||
[](std::pair<const PropertyId, PropertyValue> &pair) {
|
||||
return std::make_pair(pair.first, FromPropertyValueToValue(std::move(pair.second)));
|
||||
});
|
||||
properties.clear();
|
||||
|
||||
auto pks = PrimaryKeysFromAccessor(acc, view, schema);
|
||||
if (pks) {
|
||||
ret.merge(*pks);
|
||||
ret.GetValue().merge(std::move(*pks));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view) {
|
||||
return impl::CollectAllPropertiesImpl(acc, view);
|
||||
}
|
||||
|
||||
EdgeUniquenessFunction InitializeEdgeUniquenessFunction(bool only_unique_neighbor_rows) {
|
||||
// Functions to select connecting edges based on uniquness
|
||||
EdgeUniquenessFunction maybe_filter_based_on_edge_uniquness;
|
||||
@ -350,11 +360,20 @@ EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) {
|
||||
return edge_filler;
|
||||
}
|
||||
|
||||
bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &filters,
|
||||
const std::string_view node_name) {
|
||||
return std::ranges::all_of(filters, [&node_name, &dba, &v_acc](const auto &filter_expr) {
|
||||
auto res = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, node_name, "");
|
||||
return res.IsBool() && res.ValueBool();
|
||||
bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc,
|
||||
const std::vector<std::string> &filters) {
|
||||
return std::ranges::all_of(filters, [&dba, &v_acc](const auto &filter_expr) {
|
||||
const auto result = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, expr::identifier_node_symbol, "");
|
||||
return result.IsBool() && result.ValueBool();
|
||||
});
|
||||
}
|
||||
|
||||
bool FilterOnEdge(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const EdgeAccessor &e_acc,
|
||||
const std::vector<std::string> &filters) {
|
||||
return std::ranges::all_of(filters, [&dba, &v_acc, &e_acc](const auto &filter_expr) {
|
||||
const auto result =
|
||||
ComputeExpression(dba, v_acc, e_acc, filter_expr, expr::identifier_node_symbol, expr::identifier_edge_symbol);
|
||||
return result.IsBool() && result.ValueBool();
|
||||
});
|
||||
}
|
||||
|
||||
@ -526,4 +545,36 @@ std::vector<Element<EdgeAccessor>> OrderByEdges(DbAccessor &dba, std::vector<Edg
|
||||
return ordered;
|
||||
}
|
||||
|
||||
std::vector<Element<std::pair<VertexAccessor, EdgeAccessor>>> OrderByEdges(
|
||||
DbAccessor &dba, std::vector<EdgeAccessor> &iterable, std::vector<msgs::OrderBy> &order_by_edges,
|
||||
const std::vector<VertexAccessor> &vertex_acc) {
|
||||
MG_ASSERT(vertex_acc.size() == iterable.size());
|
||||
std::vector<Ordering> ordering;
|
||||
ordering.reserve(order_by_edges.size());
|
||||
std::transform(order_by_edges.begin(), order_by_edges.end(), std::back_inserter(ordering),
|
||||
[](const auto &order_by) { return ConvertMsgsOrderByToOrdering(order_by.direction); });
|
||||
|
||||
std::vector<Element<std::pair<VertexAccessor, EdgeAccessor>>> ordered;
|
||||
VertexAccessor current = vertex_acc.front();
|
||||
size_t id = 0;
|
||||
for (auto it = iterable.begin(); it != iterable.end(); it++, id++) {
|
||||
current = vertex_acc[id];
|
||||
std::vector<TypedValue> properties_order_by;
|
||||
properties_order_by.reserve(order_by_edges.size());
|
||||
std::transform(order_by_edges.begin(), order_by_edges.end(), std::back_inserter(properties_order_by),
|
||||
[&dba, it, current](const auto &order_by) {
|
||||
return ComputeExpression(dba, current, *it, order_by.expression.expression,
|
||||
expr::identifier_node_symbol, expr::identifier_edge_symbol);
|
||||
});
|
||||
|
||||
ordered.push_back({std::move(properties_order_by), {current, *it}});
|
||||
}
|
||||
|
||||
auto compare_typed_values = TypedValueVectorCompare(ordering);
|
||||
std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) {
|
||||
return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by);
|
||||
});
|
||||
return ordered;
|
||||
}
|
||||
|
||||
} // namespace memgraph::storage::v3
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "storage/v3/edge_accessor.hpp"
|
||||
#include "storage/v3/expr.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "storage/v3/value_conversions.hpp"
|
||||
#include "storage/v3/vertex_accessor.hpp"
|
||||
#include "utils/template_utils.hpp"
|
||||
|
||||
@ -31,7 +32,7 @@ using EdgeFiller =
|
||||
using msgs::Value;
|
||||
|
||||
template <typename T>
|
||||
concept ObjectAccessor = utils::SameAsAnyOf<T, VertexAccessor, EdgeAccessor>;
|
||||
concept OrderableObject = utils::SameAsAnyOf<T, VertexAccessor, EdgeAccessor, std::pair<VertexAccessor, EdgeAccessor>>;
|
||||
|
||||
inline bool TypedValueCompare(const TypedValue &a, const TypedValue &b) {
|
||||
// in ordering null comes after everything else
|
||||
@ -125,7 +126,7 @@ class TypedValueVectorCompare final {
|
||||
std::vector<Ordering> ordering_;
|
||||
};
|
||||
|
||||
template <ObjectAccessor TObjectAccessor>
|
||||
template <OrderableObject TObjectAccessor>
|
||||
struct Element {
|
||||
std::vector<TypedValue> properties_order_by;
|
||||
TObjectAccessor object_acc;
|
||||
@ -167,6 +168,10 @@ std::vector<Element<EdgeAccessor>> OrderByEdges(DbAccessor &dba, std::vector<Edg
|
||||
std::vector<msgs::OrderBy> &order_by_edges,
|
||||
const VertexAccessor &vertex_acc);
|
||||
|
||||
std::vector<Element<std::pair<VertexAccessor, EdgeAccessor>>> OrderByEdges(
|
||||
DbAccessor &dba, std::vector<EdgeAccessor> &iterable, std::vector<msgs::OrderBy> &order_by_edges,
|
||||
const std::vector<VertexAccessor> &vertex_acc);
|
||||
|
||||
VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable,
|
||||
const std::vector<PropertyValue> &primary_key, View view);
|
||||
|
||||
@ -177,19 +182,65 @@ std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIter
|
||||
std::array<std::vector<EdgeAccessor>, 2> GetEdgesFromVertex(const VertexAccessor &vertex_accessor,
|
||||
msgs::EdgeDirection direction);
|
||||
|
||||
bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &filters,
|
||||
std::string_view node_name);
|
||||
bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &filters);
|
||||
|
||||
bool FilterOnEdge(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const EdgeAccessor &e_acc,
|
||||
const std::vector<std::string> &filters);
|
||||
|
||||
std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc,
|
||||
const std::vector<std::string> &expressions,
|
||||
std::string_view node_name);
|
||||
|
||||
ShardResult<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc,
|
||||
std::vector<TypedValue> EvaluateEdgeExpressions(DbAccessor &dba, const VertexAccessor &v_acc, const EdgeAccessor &e_acc,
|
||||
const std::vector<std::string> &expressions);
|
||||
|
||||
template <typename T>
|
||||
concept PropertiesAccessor = utils::SameAsAnyOf<T, VertexAccessor, EdgeAccessor>;
|
||||
|
||||
template <PropertiesAccessor TAccessor>
|
||||
ShardResult<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(const TAccessor &acc,
|
||||
const std::vector<PropertyId> &props,
|
||||
View view);
|
||||
View view) {
|
||||
std::map<PropertyId, Value> ret;
|
||||
|
||||
for (const auto &prop : props) {
|
||||
auto result = acc.GetProperty(prop, view);
|
||||
if (result.HasError()) {
|
||||
spdlog::debug("Encountered an Error while trying to get a vertex property.");
|
||||
return result.GetError();
|
||||
}
|
||||
auto &value = result.GetValue();
|
||||
ret.emplace(std::make_pair(prop, FromPropertyValueToValue(std::move(value))));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view,
|
||||
const Schemas::Schema &schema);
|
||||
namespace impl {
|
||||
template <PropertiesAccessor TAccessor>
|
||||
ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesImpl(const TAccessor &acc, View view) {
|
||||
std::map<PropertyId, Value> ret;
|
||||
auto props = acc.Properties(view);
|
||||
if (props.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get vertex properties.");
|
||||
return props.GetError();
|
||||
}
|
||||
|
||||
auto &properties = props.GetValue();
|
||||
std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()),
|
||||
[](std::pair<const PropertyId, PropertyValue> &pair) {
|
||||
return std::make_pair(pair.first, conversions::FromPropertyValueToValue(std::move(pair.second)));
|
||||
});
|
||||
return ret;
|
||||
}
|
||||
} // namespace impl
|
||||
|
||||
template <PropertiesAccessor TAccessor>
|
||||
ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const TAccessor &acc, View view) {
|
||||
return impl::CollectAllPropertiesImpl<TAccessor>(acc, view);
|
||||
}
|
||||
|
||||
EdgeUniquenessFunction InitializeEdgeUniquenessFunction(bool only_unique_neighbor_rows);
|
||||
|
||||
|
@ -10,12 +10,16 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <algorithm>
|
||||
#include <exception>
|
||||
#include <experimental/source_location>
|
||||
#include <functional>
|
||||
#include <iterator>
|
||||
#include <optional>
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
#include <variant>
|
||||
|
||||
#include "common/errors.hpp"
|
||||
#include "parser/opencypher/parser.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "storage/v2/vertex.hpp"
|
||||
@ -29,6 +33,7 @@
|
||||
#include "storage/v3/bindings/symbol_generator.hpp"
|
||||
#include "storage/v3/bindings/symbol_table.hpp"
|
||||
#include "storage/v3/bindings/typed_value.hpp"
|
||||
#include "storage/v3/conversions.hpp"
|
||||
#include "storage/v3/expr.hpp"
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "storage/v3/key_store.hpp"
|
||||
@ -326,7 +331,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
|
||||
std::vector<Value> expression_results;
|
||||
if (!req.filter_expressions.empty()) {
|
||||
// NOTE - DbAccessor might get removed in the future.
|
||||
const bool eval = FilterOnVertex(dba, vertex, req.filter_expressions, expr::identifier_node_symbol);
|
||||
const bool eval = FilterOnVertex(dba, vertex, req.filter_expressions);
|
||||
if (!eval) {
|
||||
return;
|
||||
}
|
||||
@ -431,7 +436,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
|
||||
}
|
||||
if (!req.filters.empty()) {
|
||||
// NOTE - DbAccessor might get removed in the future.
|
||||
const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol);
|
||||
const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters);
|
||||
if (!eval) {
|
||||
continue;
|
||||
}
|
||||
@ -510,9 +515,191 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CommitRequest &&req) {
|
||||
return msgs::CommitResponse{};
|
||||
};
|
||||
|
||||
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
|
||||
msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest && /*req*/) {
|
||||
return msgs::GetPropertiesResponse{};
|
||||
msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) {
|
||||
if (!req.vertex_ids.empty() && !req.vertices_and_edges.empty()) {
|
||||
auto shard_error = SHARD_ERROR(ErrorCode::NONEXISTENT_OBJECT);
|
||||
auto error = CreateErrorResponse(shard_error, req.transaction_id, "");
|
||||
return msgs::GetPropertiesResponse{.error = {}};
|
||||
}
|
||||
|
||||
auto shard_acc = shard_->Access(req.transaction_id);
|
||||
auto dba = DbAccessor{&shard_acc};
|
||||
const auto view = storage::v3::View::NEW;
|
||||
|
||||
auto transform_props = [](std::map<PropertyId, Value> &&value) {
|
||||
std::vector<std::pair<PropertyId, Value>> result;
|
||||
result.reserve(value.size());
|
||||
for (auto &[id, val] : value) {
|
||||
result.emplace_back(std::make_pair(id, std::move(val)));
|
||||
}
|
||||
return result;
|
||||
};
|
||||
|
||||
auto collect_props = [&req](const VertexAccessor &v_acc,
|
||||
const std::optional<EdgeAccessor> &e_acc) -> ShardResult<std::map<PropertyId, Value>> {
|
||||
if (!req.property_ids) {
|
||||
if (e_acc) {
|
||||
return CollectAllPropertiesFromAccessor(*e_acc, view);
|
||||
}
|
||||
return CollectAllPropertiesFromAccessor(v_acc, view);
|
||||
}
|
||||
|
||||
if (e_acc) {
|
||||
return CollectSpecificPropertiesFromAccessor(*e_acc, *req.property_ids, view);
|
||||
}
|
||||
return CollectSpecificPropertiesFromAccessor(v_acc, *req.property_ids, view);
|
||||
};
|
||||
|
||||
auto find_edge = [](const VertexAccessor &v, msgs::EdgeId e) -> std::optional<EdgeAccessor> {
|
||||
auto in = v.InEdges(view);
|
||||
MG_ASSERT(in.HasValue());
|
||||
for (auto &edge : in.GetValue()) {
|
||||
if (edge.Gid().AsUint() == e.gid) {
|
||||
return edge;
|
||||
}
|
||||
}
|
||||
auto out = v.OutEdges(view);
|
||||
MG_ASSERT(out.HasValue());
|
||||
for (auto &edge : out.GetValue()) {
|
||||
if (edge.Gid().AsUint() == e.gid) {
|
||||
return edge;
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
};
|
||||
|
||||
const auto has_expr_to_evaluate = !req.expressions.empty();
|
||||
auto emplace_result_row =
|
||||
[dba, transform_props, collect_props, has_expr_to_evaluate, &req](
|
||||
const VertexAccessor &v_acc,
|
||||
const std::optional<EdgeAccessor> e_acc) mutable -> ShardResult<msgs::GetPropertiesResultRow> {
|
||||
auto maybe_id = v_acc.Id(view);
|
||||
if (maybe_id.HasError()) {
|
||||
return {maybe_id.GetError()};
|
||||
}
|
||||
const auto &id = maybe_id.GetValue();
|
||||
std::optional<msgs::EdgeId> e_id;
|
||||
if (e_acc) {
|
||||
e_id = msgs::EdgeId{e_acc->Gid().AsUint()};
|
||||
}
|
||||
msgs::VertexId v_id{msgs::Label{id.primary_label}, ConvertValueVector(id.primary_key)};
|
||||
auto maybe_props = collect_props(v_acc, e_acc);
|
||||
if (maybe_props.HasError()) {
|
||||
return {maybe_props.GetError()};
|
||||
}
|
||||
auto props = transform_props(std::move(maybe_props.GetValue()));
|
||||
auto result = msgs::GetPropertiesResultRow{.vertex = std::move(v_id), .edge = e_id, .props = std::move(props)};
|
||||
if (has_expr_to_evaluate) {
|
||||
std::vector<Value> e_results;
|
||||
if (e_acc) {
|
||||
e_results =
|
||||
ConvertToValueVectorFromTypedValueVector(EvaluateEdgeExpressions(dba, v_acc, *e_acc, req.expressions));
|
||||
} else {
|
||||
e_results = ConvertToValueVectorFromTypedValueVector(
|
||||
EvaluateVertexExpressions(dba, v_acc, req.expressions, expr::identifier_node_symbol));
|
||||
}
|
||||
result.evaluated_expressions = std::move(e_results);
|
||||
}
|
||||
return {std::move(result)};
|
||||
};
|
||||
|
||||
auto get_limit = [&req](const auto &elements) {
|
||||
size_t limit = elements.size();
|
||||
if (req.limit && *req.limit < elements.size()) {
|
||||
limit = *req.limit;
|
||||
}
|
||||
return limit;
|
||||
};
|
||||
|
||||
auto collect_response = [get_limit, &req](auto &elements, auto create_result_row) {
|
||||
msgs::GetPropertiesResponse response;
|
||||
const auto limit = get_limit(elements);
|
||||
for (size_t index = 0; index != limit; ++index) {
|
||||
auto result_row = create_result_row(elements[index]);
|
||||
if (result_row.HasError()) {
|
||||
return msgs::GetPropertiesResponse{.error = CreateErrorResponse(result_row.GetError(), req.transaction_id, "")};
|
||||
}
|
||||
response.result_row.push_back(std::move(result_row.GetValue()));
|
||||
}
|
||||
return response;
|
||||
};
|
||||
|
||||
std::vector<VertexAccessor> vertices;
|
||||
std::vector<EdgeAccessor> edges;
|
||||
|
||||
auto parse_and_filter = [dba, &vertices](auto &container, auto projection, auto filter, auto maybe_get_edge) mutable {
|
||||
for (const auto &elem : container) {
|
||||
const auto &[label, pk_v] = projection(elem);
|
||||
auto pk = ConvertPropertyVector(pk_v);
|
||||
auto v_acc = dba.FindVertex(pk, view);
|
||||
if (!v_acc || filter(*v_acc, maybe_get_edge(elem))) {
|
||||
continue;
|
||||
}
|
||||
vertices.push_back(*v_acc);
|
||||
}
|
||||
};
|
||||
auto identity = [](auto &elem) { return elem; };
|
||||
|
||||
auto filter_vertex = [dba, req](const auto &acc, const auto & /*edge*/) mutable {
|
||||
if (!req.filter) {
|
||||
return false;
|
||||
}
|
||||
return !FilterOnVertex(dba, acc, {*req.filter});
|
||||
};
|
||||
|
||||
auto filter_edge = [dba, &edges, &req, find_edge](const auto &acc, const auto &edge) mutable {
|
||||
auto e_acc = find_edge(acc, edge);
|
||||
if (!e_acc) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (req.filter && !FilterOnEdge(dba, acc, *e_acc, {*req.filter})) {
|
||||
return true;
|
||||
}
|
||||
edges.push_back(*e_acc);
|
||||
return false;
|
||||
};
|
||||
|
||||
// Handler logic here
|
||||
if (!req.vertex_ids.empty()) {
|
||||
parse_and_filter(req.vertex_ids, identity, filter_vertex, identity);
|
||||
} else {
|
||||
parse_and_filter(
|
||||
req.vertices_and_edges, [](auto &e) { return e.first; }, filter_edge, [](auto &e) { return e.second; });
|
||||
}
|
||||
|
||||
if (!req.vertex_ids.empty()) {
|
||||
if (!req.order_by.empty()) {
|
||||
auto elements = OrderByVertices(dba, vertices, req.order_by);
|
||||
return collect_response(elements, [emplace_result_row](auto &element) mutable {
|
||||
return emplace_result_row(element.object_acc, std::nullopt);
|
||||
});
|
||||
}
|
||||
return collect_response(vertices,
|
||||
[emplace_result_row](auto &acc) mutable { return emplace_result_row(acc, std::nullopt); });
|
||||
}
|
||||
|
||||
if (!req.order_by.empty()) {
|
||||
auto elements = OrderByEdges(dba, edges, req.order_by, vertices);
|
||||
return collect_response(elements, [emplace_result_row](auto &element) mutable {
|
||||
return emplace_result_row(element.object_acc.first, element.object_acc.second);
|
||||
});
|
||||
}
|
||||
|
||||
struct ZipView {
|
||||
ZipView(std::vector<VertexAccessor> &v, std::vector<EdgeAccessor> &e) : v(v), e(e) {}
|
||||
size_t size() const { return v.size(); }
|
||||
auto operator[](size_t index) { return std::make_pair(v[index], e[index]); }
|
||||
|
||||
private:
|
||||
std::vector<VertexAccessor> &v;
|
||||
std::vector<EdgeAccessor> &e;
|
||||
};
|
||||
|
||||
ZipView vertices_and_edges(vertices, edges);
|
||||
return collect_response(vertices_and_edges, [emplace_result_row](const auto &acc) mutable {
|
||||
return emplace_result_row(acc.first, acc.second);
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace memgraph::storage::v3
|
||||
|
@ -32,3 +32,4 @@ add_simulation_test(trial_query_storage/query_storage_test.cpp)
|
||||
add_simulation_test(sharded_map.cpp)
|
||||
add_simulation_test(shard_rsm.cpp)
|
||||
add_simulation_test(cluster_property_test.cpp)
|
||||
add_simulation_test(request_router.cpp)
|
||||
|
@ -76,14 +76,10 @@ class MockedShardRsm {
|
||||
using WriteRequests = msgs::WriteRequests;
|
||||
using WriteResponses = msgs::WriteResponses;
|
||||
|
||||
// ExpandOneResponse Read(ExpandOneRequest rqst);
|
||||
// GetPropertiesResponse Read(GetPropertiesRequest rqst);
|
||||
msgs::ScanVerticesResponse ReadImpl(msgs::ScanVerticesRequest rqst) {
|
||||
msgs::ScanVerticesResponse ret;
|
||||
auto as_prop_val = storage::conversions::ConvertPropertyVector(rqst.start_id.second);
|
||||
if (!IsKeyInRange(as_prop_val)) {
|
||||
ret.success = false;
|
||||
} else if (as_prop_val == ShardRsmKey{PropertyValue(0), PropertyValue(0)}) {
|
||||
if (as_prop_val == ShardRsmKey{PropertyValue(0), PropertyValue(0)}) {
|
||||
msgs::Value val(int64_t(0));
|
||||
ret.next_start_id = std::make_optional<msgs::VertexId>();
|
||||
ret.next_start_id->second =
|
||||
@ -91,37 +87,46 @@ class MockedShardRsm {
|
||||
msgs::ScanResultRow result;
|
||||
result.props.push_back(std::make_pair(msgs::PropertyId::FromUint(0), val));
|
||||
ret.results.push_back(std::move(result));
|
||||
ret.success = true;
|
||||
} else if (as_prop_val == ShardRsmKey{PropertyValue(1), PropertyValue(0)}) {
|
||||
msgs::ScanResultRow result;
|
||||
msgs::Value val(int64_t(1));
|
||||
result.props.push_back(std::make_pair(msgs::PropertyId::FromUint(0), val));
|
||||
ret.results.push_back(std::move(result));
|
||||
ret.success = true;
|
||||
} else if (as_prop_val == ShardRsmKey{PropertyValue(12), PropertyValue(13)}) {
|
||||
msgs::ScanResultRow result;
|
||||
msgs::Value val(int64_t(444));
|
||||
result.props.push_back(std::make_pair(msgs::PropertyId::FromUint(0), val));
|
||||
ret.results.push_back(std::move(result));
|
||||
ret.success = true;
|
||||
} else {
|
||||
ret.success = false;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
msgs::ExpandOneResponse ReadImpl(msgs::ExpandOneRequest rqst) { return {}; }
|
||||
msgs::ExpandOneResponse ReadImpl(msgs::GetPropertiesRequest rqst) { return {}; }
|
||||
msgs::GetPropertiesResponse ReadImpl(msgs::GetPropertiesRequest rqst) {
|
||||
msgs::GetPropertiesResponse resp;
|
||||
auto &vertices = rqst.vertex_ids;
|
||||
for (auto &vertex : vertices) {
|
||||
auto as_prop_val = storage::conversions::ConvertPropertyVector(vertex.second);
|
||||
if (as_prop_val == ShardRsmKey{PropertyValue(0), PropertyValue(0)}) {
|
||||
resp.result_row.push_back(msgs::GetPropertiesResultRow{.vertex = std::move(vertex)});
|
||||
} else if (as_prop_val == ShardRsmKey{PropertyValue(1), PropertyValue(0)}) {
|
||||
resp.result_row.push_back(msgs::GetPropertiesResultRow{.vertex = std::move(vertex)});
|
||||
} else if (as_prop_val == ShardRsmKey{PropertyValue(13), PropertyValue(13)}) {
|
||||
resp.result_row.push_back(msgs::GetPropertiesResultRow{.vertex = std::move(vertex)});
|
||||
}
|
||||
}
|
||||
return resp;
|
||||
}
|
||||
|
||||
ReadResponses Read(ReadRequests read_requests) {
|
||||
return {std::visit([this]<typename T>(T &&request) { return ReadResponses{ReadImpl(std::forward<T>(request))}; },
|
||||
std::move(read_requests))};
|
||||
}
|
||||
|
||||
msgs::CreateVerticesResponse ApplyImpl(msgs::CreateVerticesRequest rqst) { return {.success = true}; }
|
||||
msgs::CreateVerticesResponse ApplyImpl(msgs::CreateVerticesRequest rqst) { return {}; }
|
||||
msgs::DeleteVerticesResponse ApplyImpl(msgs::DeleteVerticesRequest rqst) { return {}; }
|
||||
msgs::UpdateVerticesResponse ApplyImpl(msgs::UpdateVerticesRequest rqst) { return {}; }
|
||||
msgs::CreateExpandResponse ApplyImpl(msgs::CreateExpandRequest rqst) { return {.success = true}; }
|
||||
msgs::CreateExpandResponse ApplyImpl(msgs::CreateExpandRequest rqst) { return {}; }
|
||||
msgs::DeleteEdgesResponse ApplyImpl(msgs::DeleteEdgesRequest rqst) { return {}; }
|
||||
msgs::UpdateEdgesResponse ApplyImpl(msgs::UpdateEdgesRequest rqst) { return {}; }
|
||||
msgs::CommitResponse ApplyImpl(msgs::CommitRequest rqst) { return {}; }
|
||||
|
@ -152,9 +152,7 @@ void RunStorageRaft(Raft<IoImpl, MockedShardRsm, WriteRequests, WriteResponses,
|
||||
}
|
||||
|
||||
void TestScanVertices(query::v2::RequestRouterInterface &request_router) {
|
||||
msgs::ExecutionState<ScanVerticesRequest> state{.label = "test_label"};
|
||||
|
||||
auto result = request_router.Request(state);
|
||||
auto result = request_router.ScanVertices("test_label");
|
||||
MG_ASSERT(result.size() == 2);
|
||||
{
|
||||
auto prop = result[0].GetProperty(msgs::PropertyId::FromUint(0));
|
||||
@ -162,18 +160,10 @@ void TestScanVertices(query::v2::RequestRouterInterface &request_router) {
|
||||
prop = result[1].GetProperty(msgs::PropertyId::FromUint(0));
|
||||
MG_ASSERT(prop.int_v == 444);
|
||||
}
|
||||
|
||||
result = request_router.Request(state);
|
||||
{
|
||||
MG_ASSERT(result.size() == 1);
|
||||
auto prop = result[0].GetProperty(msgs::PropertyId::FromUint(0));
|
||||
MG_ASSERT(prop.int_v == 1);
|
||||
}
|
||||
}
|
||||
|
||||
void TestCreateVertices(query::v2::RequestRouterInterface &request_router) {
|
||||
using PropVal = msgs::Value;
|
||||
msgs::ExecutionState<CreateVerticesRequest> state;
|
||||
std::vector<msgs::NewVertex> new_vertices;
|
||||
auto label_id = request_router.NameToLabel("test_label");
|
||||
msgs::NewVertex a1{.primary_key = {PropVal(int64_t(1)), PropVal(int64_t(0))}};
|
||||
@ -183,13 +173,13 @@ void TestCreateVertices(query::v2::RequestRouterInterface &request_router) {
|
||||
new_vertices.push_back(std::move(a1));
|
||||
new_vertices.push_back(std::move(a2));
|
||||
|
||||
auto result = request_router.Request(state, std::move(new_vertices));
|
||||
auto result = request_router.CreateVertices(std::move(new_vertices));
|
||||
MG_ASSERT(result.size() == 2);
|
||||
}
|
||||
|
||||
void TestCreateExpand(query::v2::RequestRouterInterface &request_router) {
|
||||
using PropVal = msgs::Value;
|
||||
msgs::ExecutionState<msgs::CreateExpandRequest> state;
|
||||
msgs::CreateExpandRequest state;
|
||||
std::vector<msgs::NewExpand> new_expands;
|
||||
|
||||
const auto edge_type_id = request_router.NameToEdgeType("edge_type");
|
||||
@ -203,24 +193,42 @@ void TestCreateExpand(query::v2::RequestRouterInterface &request_router) {
|
||||
new_expands.push_back(std::move(expand_1));
|
||||
new_expands.push_back(std::move(expand_2));
|
||||
|
||||
auto responses = request_router.Request(state, std::move(new_expands));
|
||||
auto responses = request_router.CreateExpand(std::move(new_expands));
|
||||
MG_ASSERT(responses.size() == 2);
|
||||
MG_ASSERT(responses[0].success);
|
||||
MG_ASSERT(responses[1].success);
|
||||
MG_ASSERT(!responses[0].error);
|
||||
MG_ASSERT(!responses[1].error);
|
||||
}
|
||||
|
||||
void TestExpandOne(query::v2::RequestRouterInterface &request_router) {
|
||||
msgs::ExecutionState<msgs::ExpandOneRequest> state{};
|
||||
msgs::ExpandOneRequest state{};
|
||||
msgs::ExpandOneRequest request;
|
||||
const auto edge_type_id = request_router.NameToEdgeType("edge_type");
|
||||
const auto label = msgs::Label{request_router.NameToLabel("test_label")};
|
||||
request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}});
|
||||
request.edge_types.push_back(msgs::EdgeType{edge_type_id});
|
||||
request.direction = msgs::EdgeDirection::BOTH;
|
||||
auto result_rows = request_router.Request(state, std::move(request));
|
||||
auto result_rows = request_router.ExpandOne(std::move(request));
|
||||
MG_ASSERT(result_rows.size() == 2);
|
||||
}
|
||||
|
||||
void TestGetProperties(query::v2::RequestRouterInterface &request_router) {
|
||||
using PropVal = msgs::Value;
|
||||
|
||||
auto label_id = request_router.NameToLabel("test_label");
|
||||
msgs::VertexId v0{{label_id}, {PropVal(int64_t(0)), PropVal(int64_t(0))}};
|
||||
msgs::VertexId v1{{label_id}, {PropVal(int64_t(1)), PropVal(int64_t(0))}};
|
||||
msgs::VertexId v2{{label_id}, {PropVal(int64_t(13)), PropVal(int64_t(13))}};
|
||||
|
||||
msgs::GetPropertiesRequest request;
|
||||
|
||||
request.vertex_ids.push_back({v0});
|
||||
request.vertex_ids.push_back({v1});
|
||||
request.vertex_ids.push_back({v2});
|
||||
|
||||
auto result = request_router.GetProperties(std::move(request));
|
||||
MG_ASSERT(result.size() == 3);
|
||||
}
|
||||
|
||||
template <typename RequestRouter>
|
||||
void TestAggregate(RequestRouter &request_router) {}
|
||||
|
||||
@ -343,6 +351,7 @@ void DoTest() {
|
||||
TestScanVertices(request_router);
|
||||
TestCreateVertices(request_router);
|
||||
TestCreateExpand(request_router);
|
||||
TestGetProperties(request_router);
|
||||
|
||||
simulator.ShutDown();
|
||||
|
||||
|
@ -480,6 +480,65 @@ std::tuple<size_t, std::optional<msgs::VertexId>> AttemptToScanAllWithExpression
|
||||
}
|
||||
}
|
||||
|
||||
msgs::GetPropertiesResponse AttemptToGetProperties(
|
||||
ShardClient &client, std::optional<std::vector<PropertyId>> properties, std::vector<msgs::VertexId> vertices,
|
||||
std::vector<msgs::EdgeId> edges, std::optional<size_t> limit = std::nullopt,
|
||||
std::optional<uint64_t> filter_prop = std::nullopt, bool edge = false,
|
||||
std::optional<std::string> order_by = std::nullopt) {
|
||||
msgs::GetPropertiesRequest req{};
|
||||
req.transaction_id.logical_id = GetTransactionId();
|
||||
req.property_ids = std::move(properties);
|
||||
|
||||
if (filter_prop) {
|
||||
std::string filter_expr = (!edge) ? "MG_SYMBOL_NODE.prop1 >= " : "MG_SYMBOL_EDGE.e_prop = ";
|
||||
filter_expr += std::to_string(*filter_prop);
|
||||
req.filter = std::make_optional(std::move(filter_expr));
|
||||
}
|
||||
if (order_by) {
|
||||
std::string filter_expr = (!edge) ? "MG_SYMBOL_NODE." : "MG_SYMBOL_EDGE.";
|
||||
filter_expr += *order_by;
|
||||
msgs::OrderBy order_by{.expression = {std::move(filter_expr)}, .direction = msgs::OrderingDirection::DESCENDING};
|
||||
std::vector<msgs::OrderBy> request_order_by;
|
||||
request_order_by.push_back(std::move(order_by));
|
||||
req.order_by = std::move(request_order_by);
|
||||
}
|
||||
if (limit) {
|
||||
req.limit = limit;
|
||||
}
|
||||
req.expressions = {std::string("5 = 5")};
|
||||
std::vector<msgs::VertexId> req_v;
|
||||
std::vector<msgs::EdgeId> req_e;
|
||||
for (auto &v : vertices) {
|
||||
req_v.push_back(std::move(v));
|
||||
}
|
||||
for (auto &e : edges) {
|
||||
req_e.push_back(std::move(e));
|
||||
}
|
||||
|
||||
if (!edges.empty()) {
|
||||
MG_ASSERT(edges.size() == vertices.size());
|
||||
size_t id = 0;
|
||||
req.vertices_and_edges.reserve(req_v.size());
|
||||
for (auto &v : req_v) {
|
||||
req.vertices_and_edges.push_back({std::move(v), std::move(req_e[id++])});
|
||||
}
|
||||
} else {
|
||||
req.vertex_ids = std::move(req_v);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
auto read_res = client.SendReadRequest(req);
|
||||
if (read_res.HasError()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto write_response_result = read_res.GetValue();
|
||||
auto write_response = std::get<msgs::GetPropertiesResponse>(write_response_result);
|
||||
|
||||
return write_response;
|
||||
}
|
||||
}
|
||||
|
||||
void AttemptToScanAllWithOrderByOnPrimaryProperty(ShardClient &client, msgs::VertexId start_id, uint64_t batch_limit) {
|
||||
msgs::ScanVerticesRequest scan_req;
|
||||
scan_req.batch_limit = batch_limit;
|
||||
@ -1204,6 +1263,205 @@ void TestExpandOneGraphTwo(ShardClient &client) {
|
||||
}
|
||||
}
|
||||
|
||||
void TestGetProperties(ShardClient &client) {
|
||||
const auto unique_prop_val_1 = GetUniqueInteger();
|
||||
const auto unique_prop_val_2 = GetUniqueInteger();
|
||||
const auto unique_prop_val_3 = GetUniqueInteger();
|
||||
const auto unique_prop_val_4 = GetUniqueInteger();
|
||||
const auto unique_prop_val_5 = GetUniqueInteger();
|
||||
|
||||
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_1));
|
||||
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2));
|
||||
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_3));
|
||||
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_4));
|
||||
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_5));
|
||||
|
||||
const msgs::Label prim_label = {.id = get_primary_label()};
|
||||
const msgs::PrimaryKey prim_key = {msgs::Value(static_cast<int64_t>(unique_prop_val_1))};
|
||||
const msgs::VertexId v_id = {prim_label, prim_key};
|
||||
const msgs::PrimaryKey prim_key_2 = {msgs::Value(static_cast<int64_t>(unique_prop_val_2))};
|
||||
const msgs::VertexId v_id_2 = {prim_label, prim_key_2};
|
||||
const msgs::PrimaryKey prim_key_3 = {msgs::Value(static_cast<int64_t>(unique_prop_val_3))};
|
||||
const msgs::VertexId v_id_3 = {prim_label, prim_key_3};
|
||||
const msgs::PrimaryKey prim_key_4 = {msgs::Value(static_cast<int64_t>(unique_prop_val_4))};
|
||||
const msgs::VertexId v_id_4 = {prim_label, prim_key_4};
|
||||
const msgs::PrimaryKey prim_key_5 = {msgs::Value(static_cast<int64_t>(unique_prop_val_5))};
|
||||
const msgs::VertexId v_id_5 = {prim_label, prim_key_5};
|
||||
const auto prop_id_2 = PropertyId::FromUint(2);
|
||||
const auto prop_id_4 = PropertyId::FromUint(4);
|
||||
const auto prop_id_5 = PropertyId::FromUint(5);
|
||||
// No properties
|
||||
{
|
||||
const auto result = AttemptToGetProperties(client, {{}}, {v_id, v_id_2}, {});
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(result.result_row.size() == 2);
|
||||
for (const auto &elem : result.result_row) {
|
||||
MG_ASSERT(elem.props.size() == 0);
|
||||
}
|
||||
}
|
||||
// All properties
|
||||
{
|
||||
const auto result = AttemptToGetProperties(client, std::nullopt, {v_id, v_id_2}, {});
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(result.result_row.size() == 2);
|
||||
for (const auto &elem : result.result_row) {
|
||||
MG_ASSERT(elem.props.size() == 3);
|
||||
}
|
||||
}
|
||||
{
|
||||
// Specific properties
|
||||
const auto result =
|
||||
AttemptToGetProperties(client, std::vector{prop_id_2, prop_id_4, prop_id_5}, {v_id, v_id_2, v_id_3}, {});
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(!result.result_row.empty());
|
||||
MG_ASSERT(result.result_row.size() == 3);
|
||||
for (const auto &elem : result.result_row) {
|
||||
MG_ASSERT(elem.props.size() == 3);
|
||||
}
|
||||
}
|
||||
{
|
||||
// Two properties from two vertices with a filter on unique_prop_5
|
||||
const auto result = AttemptToGetProperties(client, std::vector{prop_id_2, prop_id_4}, {v_id, v_id_2, v_id_5}, {},
|
||||
std::nullopt, unique_prop_val_5);
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(result.result_row.size() == 1);
|
||||
}
|
||||
{
|
||||
// One property from three vertices.
|
||||
const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, {v_id, v_id_2, v_id_3}, {});
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(result.result_row.size() == 3);
|
||||
MG_ASSERT(result.result_row[0].props.size() == 1);
|
||||
MG_ASSERT(result.result_row[1].props.size() == 1);
|
||||
MG_ASSERT(result.result_row[2].props.size() == 1);
|
||||
}
|
||||
{
|
||||
// Same as before but with limit of 1 row
|
||||
const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, {v_id, v_id_2, v_id_3}, {},
|
||||
std::make_optional<size_t>(1));
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(result.result_row.size() == 1);
|
||||
}
|
||||
{
|
||||
// Same as before but with a limit greater than the elements returned
|
||||
const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, std::vector{v_id, v_id_2, v_id_3}, {},
|
||||
std::make_optional<size_t>(5));
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(result.result_row.size() == 3);
|
||||
}
|
||||
{
|
||||
// Order by on `prop1` (descending)
|
||||
const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, {v_id, v_id_2, v_id_3}, {}, std::nullopt,
|
||||
std::nullopt, false, "prop1");
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(result.result_row.size() == 3);
|
||||
MG_ASSERT(result.result_row[0].vertex == v_id_3);
|
||||
MG_ASSERT(result.result_row[1].vertex == v_id_2);
|
||||
MG_ASSERT(result.result_row[2].vertex == v_id);
|
||||
}
|
||||
{
|
||||
// Order by and filter on >= unique_prop_val_3 && assert result row data members
|
||||
const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, {v_id, v_id_2, v_id_3, v_id_4, v_id_5},
|
||||
{}, std::nullopt, unique_prop_val_3, false, "prop1");
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(result.result_row.size() == 3);
|
||||
MG_ASSERT(result.result_row[0].vertex == v_id_5);
|
||||
MG_ASSERT(result.result_row[0].props.size() == 1);
|
||||
MG_ASSERT(result.result_row[0].props.front().second == prim_key_5.front());
|
||||
MG_ASSERT(result.result_row[0].props.size() == 1);
|
||||
MG_ASSERT(result.result_row[0].props.front().first == prop_id_2);
|
||||
MG_ASSERT(result.result_row[0].evaluated_expressions.size() == 1);
|
||||
MG_ASSERT(result.result_row[0].evaluated_expressions.front() == msgs::Value(true));
|
||||
|
||||
MG_ASSERT(result.result_row[1].vertex == v_id_4);
|
||||
MG_ASSERT(result.result_row[1].props.size() == 1);
|
||||
MG_ASSERT(result.result_row[1].props.front().second == prim_key_4.front());
|
||||
MG_ASSERT(result.result_row[1].props.size() == 1);
|
||||
MG_ASSERT(result.result_row[1].props.front().first == prop_id_2);
|
||||
MG_ASSERT(result.result_row[1].evaluated_expressions.size() == 1);
|
||||
MG_ASSERT(result.result_row[1].evaluated_expressions.front() == msgs::Value(true));
|
||||
|
||||
MG_ASSERT(result.result_row[2].vertex == v_id_3);
|
||||
MG_ASSERT(result.result_row[2].props.size() == 1);
|
||||
MG_ASSERT(result.result_row[2].props.front().second == prim_key_3.front());
|
||||
MG_ASSERT(result.result_row[2].props.size() == 1);
|
||||
MG_ASSERT(result.result_row[2].props.front().first == prop_id_2);
|
||||
MG_ASSERT(result.result_row[2].evaluated_expressions.size() == 1);
|
||||
MG_ASSERT(result.result_row[2].evaluated_expressions.front() == msgs::Value(true));
|
||||
}
|
||||
|
||||
// Edges
|
||||
const auto edge_gid = GetUniqueInteger();
|
||||
const auto edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger());
|
||||
const auto unique_edge_prop_id = 7;
|
||||
const auto edge_prop_val = GetUniqueInteger();
|
||||
MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_2, edge_gid, unique_edge_prop_id,
|
||||
edge_prop_val, {edge_type_id}));
|
||||
const auto edge_gid_2 = GetUniqueInteger();
|
||||
const auto edge_prop_val_2 = GetUniqueInteger();
|
||||
MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_3, unique_prop_val_4, edge_gid_2,
|
||||
unique_edge_prop_id, edge_prop_val_2, {edge_type_id}));
|
||||
const auto edge_prop_id = PropertyId::FromUint(unique_edge_prop_id);
|
||||
std::vector<msgs::EdgeId> edge_ids = {{edge_gid}, {edge_gid_2}};
|
||||
// No properties
|
||||
{
|
||||
const auto result = AttemptToGetProperties(client, {{}}, {v_id_2, v_id_3}, edge_ids);
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(result.result_row.size() == 2);
|
||||
for (const auto &elem : result.result_row) {
|
||||
MG_ASSERT(elem.props.size() == 0);
|
||||
}
|
||||
}
|
||||
// All properties
|
||||
{
|
||||
const auto result = AttemptToGetProperties(client, std::nullopt, {v_id_2, v_id_3}, edge_ids);
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(result.result_row.size() == 2);
|
||||
for (const auto &elem : result.result_row) {
|
||||
MG_ASSERT(elem.props.size() == 1);
|
||||
}
|
||||
}
|
||||
// Properties for two vertices
|
||||
{
|
||||
const auto result = AttemptToGetProperties(client, std::vector{edge_prop_id}, {v_id_2, v_id_3}, edge_ids);
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(result.result_row.size() == 2);
|
||||
}
|
||||
// Filter
|
||||
{
|
||||
const auto result = AttemptToGetProperties(client, std::vector{edge_prop_id}, {v_id_2, v_id_3}, edge_ids, {},
|
||||
{edge_prop_val}, true);
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(result.result_row.size() == 1);
|
||||
MG_ASSERT(result.result_row.front().edge);
|
||||
MG_ASSERT(result.result_row.front().edge.value().gid == edge_gid);
|
||||
MG_ASSERT(result.result_row.front().props.size() == 1);
|
||||
MG_ASSERT(result.result_row.front().props.front().second == msgs::Value(static_cast<int64_t>(edge_prop_val)));
|
||||
}
|
||||
// Order by
|
||||
{
|
||||
const auto result =
|
||||
AttemptToGetProperties(client, std::vector{edge_prop_id}, {v_id_2, v_id_3}, edge_ids, {}, {}, true, "e_prop");
|
||||
MG_ASSERT(!result.error);
|
||||
MG_ASSERT(result.result_row.size() == 2);
|
||||
MG_ASSERT(result.result_row[0].vertex == v_id_3);
|
||||
MG_ASSERT(result.result_row[0].edge);
|
||||
MG_ASSERT(result.result_row[0].edge.value().gid == edge_gid_2);
|
||||
MG_ASSERT(result.result_row[0].props.size() == 1);
|
||||
MG_ASSERT(result.result_row[0].props.front().second == msgs::Value(static_cast<int64_t>(edge_prop_val_2)));
|
||||
MG_ASSERT(result.result_row[0].evaluated_expressions.size() == 1);
|
||||
MG_ASSERT(result.result_row[0].evaluated_expressions.front() == msgs::Value(true));
|
||||
|
||||
MG_ASSERT(result.result_row[1].vertex == v_id_2);
|
||||
MG_ASSERT(result.result_row[1].edge);
|
||||
MG_ASSERT(result.result_row[1].edge.value().gid == edge_gid);
|
||||
MG_ASSERT(result.result_row[1].props.size() == 1);
|
||||
MG_ASSERT(result.result_row[1].props.front().second == msgs::Value(static_cast<int64_t>(edge_prop_val)));
|
||||
MG_ASSERT(result.result_row[1].evaluated_expressions.size() == 1);
|
||||
MG_ASSERT(result.result_row[1].evaluated_expressions.front() == msgs::Value(true));
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
int TestMessages() {
|
||||
@ -1242,9 +1500,12 @@ int TestMessages() {
|
||||
auto shard_ptr2 = std::make_unique<Shard>(get_primary_label(), min_prim_key, max_prim_key, schema_prop);
|
||||
auto shard_ptr3 = std::make_unique<Shard>(get_primary_label(), min_prim_key, max_prim_key, schema_prop);
|
||||
|
||||
shard_ptr1->StoreMapping({{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}});
|
||||
shard_ptr2->StoreMapping({{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}});
|
||||
shard_ptr3->StoreMapping({{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}});
|
||||
shard_ptr1->StoreMapping(
|
||||
{{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}, {7, "e_prop"}});
|
||||
shard_ptr2->StoreMapping(
|
||||
{{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}, {7, "e_prop"}});
|
||||
shard_ptr3->StoreMapping(
|
||||
{{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}, {7, "e_prop"}});
|
||||
|
||||
std::vector<Address> address_for_1{shard_server_2_address, shard_server_3_address};
|
||||
std::vector<Address> address_for_2{shard_server_1_address, shard_server_3_address};
|
||||
@ -1286,6 +1547,8 @@ int TestMessages() {
|
||||
TestExpandOneGraphOne(client);
|
||||
TestExpandOneGraphTwo(client);
|
||||
|
||||
// GetProperties tests
|
||||
TestGetProperties(client);
|
||||
simulator.ShutDown();
|
||||
|
||||
SimulatorStats stats = simulator.Stats();
|
||||
|
@ -51,6 +51,8 @@ using memgraph::msgs::CreateVerticesResponse;
|
||||
using memgraph::msgs::ExpandOneRequest;
|
||||
using memgraph::msgs::ExpandOneResponse;
|
||||
using memgraph::msgs::ExpandOneResultRow;
|
||||
using memgraph::msgs::GetPropertiesRequest;
|
||||
using memgraph::msgs::GetPropertiesResultRow;
|
||||
using memgraph::msgs::NewExpand;
|
||||
using memgraph::msgs::NewVertex;
|
||||
using memgraph::msgs::ScanVerticesRequest;
|
||||
@ -92,6 +94,8 @@ class MockedRequestRouter : public RequestRouterInterface {
|
||||
|
||||
std::vector<CreateExpandResponse> CreateExpand(std::vector<NewExpand> new_edges) override { return {}; }
|
||||
|
||||
std::vector<GetPropertiesResultRow> GetProperties(GetPropertiesRequest rqst) override { return {}; }
|
||||
|
||||
const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override {
|
||||
return properties_.IdToName(id.AsUint());
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user