Added prototype for distributed ScanAll and CreateVertices cursors.
Changed ScanAll request return type to be friendlier for use.
This commit is contained in:
parent
b5f8060c1f
commit
c182cf8384
@ -10,6 +10,7 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "query/v2/accessors.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
|
||||
namespace memgraph::query::v2::accessors {
|
||||
EdgeAccessor::EdgeAccessor(Edge edge, std::map<std::string, Value> props)
|
||||
@ -35,7 +36,7 @@ VertexAccessor EdgeAccessor::To() const { return VertexAccessor(Vertex{edge.dst}
|
||||
|
||||
VertexAccessor EdgeAccessor::From() const { return VertexAccessor(Vertex{edge.src}, {}); }
|
||||
|
||||
VertexAccessor::VertexAccessor(Vertex v, std::map<std::string, Value> props)
|
||||
VertexAccessor::VertexAccessor(Vertex v, std::map<requests::PropertyId, Value> props)
|
||||
: vertex(std::move(v)), properties(std::move(props)) {}
|
||||
|
||||
std::vector<Label> VertexAccessor::Labels() const { return vertex.labels; }
|
||||
@ -45,7 +46,7 @@ bool VertexAccessor::HasLabel(Label &label) const {
|
||||
[label](const auto &l) { return l.id == label.id; }) != vertex.labels.end();
|
||||
}
|
||||
|
||||
std::map<std::string, Value> VertexAccessor::Properties() const {
|
||||
std::map<requests::PropertyId, Value> VertexAccessor::Properties() const {
|
||||
// std::map<std::string, TypedValue> res;
|
||||
// for (const auto &[name, value] : *properties) {
|
||||
// res[name] = ValueToTypedValue(value);
|
||||
@ -54,9 +55,17 @@ std::map<std::string, Value> VertexAccessor::Properties() const {
|
||||
return properties;
|
||||
}
|
||||
|
||||
Value VertexAccessor::GetProperty(requests::PropertyId prop_id) const {
|
||||
MG_ASSERT(properties.contains(prop_id));
|
||||
return Value(properties[prop_id]);
|
||||
// return ValueToTypedValue(properties[prop_name]);
|
||||
}
|
||||
|
||||
Value VertexAccessor::GetProperty(const std::string &prop_name) const {
|
||||
MG_ASSERT(properties.contains(prop_name));
|
||||
return Value(properties[prop_name]);
|
||||
// TODO(kostasrim) Add string mapping
|
||||
auto prop_id = requests::PropertyId::FromUint(0);
|
||||
MG_ASSERT(properties.contains(prop_id));
|
||||
return Value(properties[prop_id]);
|
||||
// return ValueToTypedValue(properties[prop_name]);
|
||||
}
|
||||
|
||||
|
@ -62,14 +62,16 @@ class EdgeAccessor final {
|
||||
|
||||
class VertexAccessor final {
|
||||
public:
|
||||
VertexAccessor(Vertex v, std::map<std::string, Value> props);
|
||||
using PropertyId = requests::PropertyId;
|
||||
VertexAccessor(Vertex v, std::map<PropertyId, Value> props);
|
||||
|
||||
std::vector<Label> Labels() const;
|
||||
|
||||
bool HasLabel(Label &label) const;
|
||||
|
||||
std::map<std::string, Value> Properties() const;
|
||||
std::map<PropertyId, Value> Properties() const;
|
||||
|
||||
Value GetProperty(PropertyId prop_name) const;
|
||||
Value GetProperty(const std::string &prop_name) const;
|
||||
|
||||
// Dummy function
|
||||
@ -122,7 +124,7 @@ class VertexAccessor final {
|
||||
|
||||
private:
|
||||
Vertex vertex;
|
||||
mutable std::map<std::string, Value> properties;
|
||||
mutable std::map<PropertyId, Value> properties;
|
||||
};
|
||||
|
||||
// inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); }
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "query/v2/parameters.hpp"
|
||||
#include "query/v2/plan/profile.hpp"
|
||||
//#include "query/v2/trigger.hpp"
|
||||
#include "query/v2/shard_request_manager.hpp"
|
||||
#include "utils/async_timer.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
@ -72,6 +73,7 @@ struct ExecutionContext {
|
||||
ExecutionStats execution_stats;
|
||||
// TriggerContextCollector *trigger_context_collector{nullptr};
|
||||
utils::AsyncTimer timer;
|
||||
std::unique_ptr<requests::ShardRequestManagerInterface> shard_request_manager{nullptr};
|
||||
};
|
||||
|
||||
static_assert(std::is_move_assignable_v<ExecutionContext>, "ExecutionContext must be move assignable!");
|
||||
|
@ -391,34 +391,36 @@ TypedValue Last(const TypedValue *args, int64_t nargs, const FunctionContext &ct
|
||||
TypedValue Properties(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
|
||||
FType<Or<Null, Vertex, Edge>>("properties", args, nargs);
|
||||
auto *dba = ctx.db_accessor;
|
||||
auto get_properties = [&](const auto &record_accessor) {
|
||||
TypedValue::TMap properties(ctx.memory);
|
||||
auto props = record_accessor.Properties();
|
||||
// add error handling
|
||||
// if (maybe_props.HasError()) {
|
||||
// switch (maybe_props.GetError()) {
|
||||
// case storage::v3::Error::DELETED_OBJECT:
|
||||
// throw QueryRuntimeException("Trying to get properties from a deleted object.");
|
||||
// case storage::v3::Error::NONEXISTENT_OBJECT:
|
||||
// throw query::v2::QueryRuntimeException("Trying to get properties from an object that doesn't exist.");
|
||||
// case storage::v3::Error::SERIALIZATION_ERROR:
|
||||
// case storage::v3::Error::VERTEX_HAS_EDGES:
|
||||
// case storage::v3::Error::PROPERTIES_DISABLED:
|
||||
// throw QueryRuntimeException("Unexpected error when getting properties.");
|
||||
// }
|
||||
for (const auto &property : props) {
|
||||
properties.emplace(property.first, ValueToTypedValue(property.second));
|
||||
}
|
||||
return TypedValue(std::move(properties));
|
||||
};
|
||||
const auto &value = args[0];
|
||||
if (value.IsNull()) {
|
||||
return TypedValue(ctx.memory);
|
||||
} else if (value.IsVertex()) {
|
||||
return get_properties(value.ValueVertex());
|
||||
} else {
|
||||
return get_properties(value.ValueEdge());
|
||||
}
|
||||
// auto get_properties = [&](const auto &record_accessor) {
|
||||
// TypedValue::TMap properties(ctx.memory);
|
||||
// auto props = record_accessor.Properties();
|
||||
// // add error handling
|
||||
// // if (maybe_props.HasError()) {
|
||||
// // switch (maybe_props.GetError()) {
|
||||
// // case storage::v3::Error::DELETED_OBJECT:
|
||||
// // throw QueryRuntimeException("Trying to get properties from a deleted object.");
|
||||
// // case storage::v3::Error::NONEXISTENT_OBJECT:
|
||||
// // throw query::v2::QueryRuntimeException("Trying to get properties from an object that doesn't
|
||||
// exist.");
|
||||
// // case storage::v3::Error::SERIALIZATION_ERROR:
|
||||
// // case storage::v3::Error::VERTEX_HAS_EDGES:
|
||||
// // case storage::v3::Error::PROPERTIES_DISABLED:
|
||||
// // throw QueryRuntimeException("Unexpected error when getting properties.");
|
||||
// // }
|
||||
// for (const auto &property : props) {
|
||||
// properties.emplace(property.first, ValueToTypedValue(property.second));
|
||||
// }
|
||||
// return TypedValue(std::move(properties));
|
||||
// };
|
||||
// const auto &value = args[0];
|
||||
// if (value.IsNull()) {
|
||||
// return TypedValue(ctx.memory);
|
||||
// } else if (value.IsVertex()) {
|
||||
// return get_properties(value.ValueVertex());
|
||||
// } else {
|
||||
// return get_properties(value.ValueEdge());
|
||||
// }
|
||||
return {};
|
||||
}
|
||||
|
||||
TypedValue Size(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
|
||||
@ -598,34 +600,35 @@ TypedValue ValueType(const TypedValue *args, int64_t nargs, const FunctionContex
|
||||
TypedValue Keys(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
|
||||
FType<Or<Null, Vertex, Edge>>("keys", args, nargs);
|
||||
auto *dba = ctx.db_accessor;
|
||||
auto get_keys = [&](const auto &record_accessor) {
|
||||
TypedValue::TVector keys(ctx.memory);
|
||||
auto maybe_props = record_accessor.Properties();
|
||||
// if (maybe_props.HasError()) {
|
||||
// switch (maybe_props.GetError()) {
|
||||
// case storage::v3::Error::DELETED_OBJECT:
|
||||
// throw QueryRuntimeException("Trying to get keys from a deleted object.");
|
||||
// case storage::v3::Error::NONEXISTENT_OBJECT:
|
||||
// throw query::v2::QueryRuntimeException("Trying to get keys from an object that doesn't exist.");
|
||||
// case storage::v3::Error::SERIALIZATION_ERROR:
|
||||
// case storage::v3::Error::VERTEX_HAS_EDGES:
|
||||
// case storage::v3::Error::PROPERTIES_DISABLED:
|
||||
// throw QueryRuntimeException("Unexpected error when getting keys.");
|
||||
// }
|
||||
// }
|
||||
for (const auto &property : maybe_props) {
|
||||
keys.emplace_back(property.first);
|
||||
}
|
||||
return TypedValue(std::move(keys));
|
||||
};
|
||||
const auto &value = args[0];
|
||||
if (value.IsNull()) {
|
||||
return TypedValue(ctx.memory);
|
||||
} else if (value.IsVertex()) {
|
||||
return get_keys(value.ValueVertex());
|
||||
} else {
|
||||
return get_keys(value.ValueEdge());
|
||||
}
|
||||
// auto get_keys = [&](const auto &record_accessor) {
|
||||
// TypedValue::TVector keys(ctx.memory);
|
||||
// auto maybe_props = record_accessor.Properties();
|
||||
// // if (maybe_props.HasError()) {
|
||||
// // switch (maybe_props.GetError()) {
|
||||
// // case storage::v3::Error::DELETED_OBJECT:
|
||||
// // throw QueryRuntimeException("Trying to get keys from a deleted object.");
|
||||
// // case storage::v3::Error::NONEXISTENT_OBJECT:
|
||||
// // throw query::v2::QueryRuntimeException("Trying to get keys from an object that doesn't exist.");
|
||||
// // case storage::v3::Error::SERIALIZATION_ERROR:
|
||||
// // case storage::v3::Error::VERTEX_HAS_EDGES:
|
||||
// // case storage::v3::Error::PROPERTIES_DISABLED:
|
||||
// // throw QueryRuntimeException("Unexpected error when getting keys.");
|
||||
// // }
|
||||
// // }
|
||||
// for (const auto &property : maybe_props) {
|
||||
// keys.emplace_back(property.first);
|
||||
// }
|
||||
// return TypedValue(std::move(keys));
|
||||
// };
|
||||
// const auto &value = args[0];
|
||||
// if (value.IsNull()) {
|
||||
// return TypedValue(ctx.memory);
|
||||
// } else if (value.IsVertex()) {
|
||||
// return get_keys(value.ValueVertex());
|
||||
// } else {
|
||||
// return get_keys(value.ValueEdge());
|
||||
// }
|
||||
return TypedValue{};
|
||||
}
|
||||
|
||||
TypedValue Labels(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
|
||||
|
@ -38,6 +38,8 @@
|
||||
#include "query/v2/procedure/cypher_types.hpp"
|
||||
//#include "query/v2/procedure/mg_procedure_impl.hpp"
|
||||
//#include "query/v2/procedure/module.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "query/v2/shard_request_manager.hpp"
|
||||
#include "storage/v3/conversions.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "utils/algorithm.hpp"
|
||||
@ -408,9 +410,11 @@ class ScanAllCursor : public Cursor {
|
||||
op_name_(op_name) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP(op_name_);
|
||||
return false;
|
||||
// SCOPED_PROFILE_OP(op_name_);
|
||||
// auto &shard_manager = *context.shard_request_manager;
|
||||
// if (MustAbort(context)) throw HintedAbortError();
|
||||
|
||||
//
|
||||
// while (!vertices_ || vertices_it_.value() == vertices_.value().end()) {
|
||||
// if (!input_cursor_->Pull(frame, context)) return false;
|
||||
@ -445,6 +449,8 @@ class ScanAllCursor : public Cursor {
|
||||
std::optional<typename std::result_of<TVerticesFun(Frame &, ExecutionContext &)>::type::value_type> vertices_;
|
||||
std::optional<decltype(vertices_.value().begin())> vertices_it_;
|
||||
const char *op_name_;
|
||||
std::vector<requests::ScanVerticesResponse> current_batch;
|
||||
requests::ExecutionState<requests::ScanVerticesRequest> request_state;
|
||||
};
|
||||
|
||||
ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)
|
||||
@ -4137,4 +4143,93 @@ bool Foreach::Accept(HierarchicalLogicalOperatorVisitor &visitor) {
|
||||
return visitor.PostVisit(*this);
|
||||
}
|
||||
|
||||
class DistributedScanAllCursor : public Cursor {
|
||||
public:
|
||||
explicit DistributedScanAllCursor(Symbol output_symbol, UniqueCursorPtr input_cursor, const char *op_name)
|
||||
: output_symbol_(output_symbol), input_cursor_(std::move(input_cursor)), op_name_(op_name) {}
|
||||
|
||||
using VertexAccessor = accessors::VertexAccessor;
|
||||
|
||||
bool MakeRequest(requests::ShardRequestManagerInterface &shard_manager) {
|
||||
current_batch = shard_manager.Request(request_state_);
|
||||
current_vertex_it = current_batch.begin();
|
||||
return !current_batch.empty();
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP(op_name_);
|
||||
auto &shard_manager = *context.shard_request_manager;
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
using State = requests::ExecutionState<requests::ScanVerticesRequest>;
|
||||
|
||||
if (request_state_.state == State::INITIALIZING) {
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
}
|
||||
|
||||
if (current_vertex_it == current_batch.end()) {
|
||||
if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager)) {
|
||||
ResetExecutionState();
|
||||
return Pull(frame, context);
|
||||
}
|
||||
}
|
||||
|
||||
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it));
|
||||
++current_vertex_it;
|
||||
return true;
|
||||
}
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void ResetExecutionState() {
|
||||
current_batch.clear();
|
||||
current_vertex_it = current_batch.end();
|
||||
request_state_ = requests::ExecutionState<requests::ScanVerticesRequest>{};
|
||||
}
|
||||
|
||||
void Reset() override {
|
||||
input_cursor_->Reset();
|
||||
ResetExecutionState();
|
||||
}
|
||||
|
||||
private:
|
||||
const Symbol output_symbol_;
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
const char *op_name_;
|
||||
std::vector<VertexAccessor> current_batch;
|
||||
decltype(std::vector<VertexAccessor>().begin()) current_vertex_it;
|
||||
requests::ExecutionState<requests::ScanVerticesRequest> request_state_;
|
||||
};
|
||||
|
||||
class DistributedCreateNodeCursor : public Cursor {
|
||||
public:
|
||||
using InputOperator = std::shared_ptr<memgraph::query::v2::plan::LogicalOperator>;
|
||||
DistributedCreateNodeCursor(const InputOperator &op, utils::MemoryResource *mem,
|
||||
std::vector<NodeCreationInfo> nodes_info)
|
||||
: input_cursor_(op->MakeCursor(mem)), nodes_info_(std::move(nodes_info)) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP("CreateNode");
|
||||
if (input_cursor_->Pull(frame, context)) {
|
||||
auto &shard_manager = context.shard_request_manager;
|
||||
shard_manager->Request(state_, NodeCreationInfoToRequest());
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void Reset() override { state_ = {}; }
|
||||
|
||||
std::vector<requests::NewVertexLabel> NodeCreationInfoToRequest() const {
|
||||
// TODO(kostasrim) Add the conversion
|
||||
return {};
|
||||
}
|
||||
|
||||
private:
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
std::vector<NodeCreationInfo> nodes_info_;
|
||||
requests::ExecutionState<requests::CreateVerticesRequest> state_;
|
||||
};
|
||||
} // namespace memgraph::query::v2::plan
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <unordered_map>
|
||||
#include <variant>
|
||||
@ -140,7 +141,41 @@ struct Value {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Value(Value &&other) noexcept {};
|
||||
|
||||
Value(Value &&other) noexcept : type(other.type) {
|
||||
switch (other.type) {
|
||||
case Type::NILL:
|
||||
break;
|
||||
case Type::BOOL:
|
||||
this->bool_v = other.bool_v;
|
||||
break;
|
||||
case Type::INT64:
|
||||
this->int_v = other.int_v;
|
||||
break;
|
||||
case Type::DOUBLE:
|
||||
this->double_v = other.double_v;
|
||||
break;
|
||||
case Type::STRING:
|
||||
new (&string_v) std::string(std::move(other.string_v));
|
||||
break;
|
||||
case Type::LIST:
|
||||
new (&list_v) std::vector<Value>(std::move(other.list_v));
|
||||
break;
|
||||
case Type::MAP:
|
||||
new (&map_v) std::map<std::string, Value>(std::move(other.map_v));
|
||||
break;
|
||||
case Type::VERTEX:
|
||||
new (&vertex_v) Vertex(std::move(other.vertex_v));
|
||||
break;
|
||||
case Type::EDGE:
|
||||
new (&edge_v) Edge(std::move(other.edge_v));
|
||||
break;
|
||||
case Type::PATH:
|
||||
new (&path_v) Path(std::move(other.path_v));
|
||||
break;
|
||||
}
|
||||
other.type = Type::NILL;
|
||||
}
|
||||
|
||||
explicit Value(const bool val) : bool_v(val), type(BOOL){};
|
||||
explicit Value(const int64_t val) : int_v(val), type(INT64){};
|
||||
@ -222,7 +257,39 @@ struct Value {
|
||||
}
|
||||
}
|
||||
|
||||
~Value(){};
|
||||
~Value() {
|
||||
switch (type) {
|
||||
// destructor for primitive types does nothing
|
||||
case Type::NILL:
|
||||
case Type::BOOL:
|
||||
case Type::INT64:
|
||||
case Type::DOUBLE:
|
||||
return;
|
||||
|
||||
// destructor for non primitive types since we used placement new
|
||||
case Type::STRING:
|
||||
std::destroy_at(&string_v);
|
||||
return;
|
||||
case Type::LIST:
|
||||
std::destroy_at(&list_v);
|
||||
return;
|
||||
case Type::MAP:
|
||||
std::destroy_at(&map_v);
|
||||
return;
|
||||
|
||||
// are these needed to be defined?
|
||||
case Type::VERTEX:
|
||||
std::destroy_at(&vertex_v);
|
||||
return;
|
||||
case Type::PATH:
|
||||
std::destroy_at(&path_v);
|
||||
return;
|
||||
case Type::EDGE:
|
||||
std::destroy_at(&edge_v);
|
||||
default:
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct ValuesMap {
|
||||
@ -265,10 +332,16 @@ struct ScanVerticesRequest {
|
||||
StorageView storage_view;
|
||||
};
|
||||
|
||||
struct ScanResultRow {
|
||||
Vertex vertex;
|
||||
// empty is no properties returned
|
||||
std::map<PropertyId, Value> props;
|
||||
};
|
||||
|
||||
struct ScanVerticesResponse {
|
||||
bool success;
|
||||
Values values;
|
||||
std::optional<VertexId> next_start_id;
|
||||
std::vector<ScanResultRow> results;
|
||||
};
|
||||
|
||||
using VertexOrEdgeIds = std::variant<VertexId, EdgeId>;
|
||||
|
@ -33,6 +33,7 @@
|
||||
#include "io/rsm/shard_rsm.hpp"
|
||||
#include "io/simulator/simulator.hpp"
|
||||
#include "io/simulator/simulator_transport.hpp"
|
||||
#include "query/v2/accessors.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "utils/result.hpp"
|
||||
@ -89,26 +90,34 @@ struct ExecutionState {
|
||||
|
||||
class ShardRequestManagerInterface {
|
||||
public:
|
||||
using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor;
|
||||
ShardRequestManagerInterface() = default;
|
||||
virtual void StartTransaction() = 0;
|
||||
virtual std::vector<ScanVerticesResponse> Request(ExecutionState<ScanVerticesRequest> &state) = 0;
|
||||
virtual std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) = 0;
|
||||
virtual std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state,
|
||||
std::vector<requests::NewVertexLabel> new_vertices) = 0;
|
||||
virtual std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) = 0;
|
||||
virtual ~ShardRequestManagerInterface() {}
|
||||
ShardRequestManagerInterface(const ShardRequestManagerInterface &) = delete;
|
||||
ShardRequestManagerInterface(ShardRequestManagerInterface &&) = delete;
|
||||
};
|
||||
|
||||
// TODO(kostasrim)rename this class template
|
||||
template <typename TTransport, typename... Rest>
|
||||
template <typename TTransport>
|
||||
class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
public:
|
||||
using WriteRequests = CreateVerticesRequest;
|
||||
using WriteResponses = CreateVerticesResponse;
|
||||
using StorageClient = memgraph::coordinator::RsmClient<TTransport, WriteRequests, WriteResponses, Rest...>;
|
||||
using ReadRequests = std::variant<ScanVerticesRequest, ExpandOneRequest>;
|
||||
using ReadResponses = std::variant<ScanVerticesResponse, ExpandOneResponse>;
|
||||
using StorageClient =
|
||||
memgraph::coordinator::RsmClient<TTransport, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
|
||||
using CoordinatorClient = memgraph::coordinator::CoordinatorClient<TTransport>;
|
||||
using Address = memgraph::io::Address;
|
||||
using Shard = memgraph::coordinator::Shard;
|
||||
using ShardMap = memgraph::coordinator::ShardMap;
|
||||
using CompoundKey = memgraph::coordinator::CompoundKey;
|
||||
using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor;
|
||||
ShardRequestManager(CoordinatorClient coord, memgraph::io::Io<TTransport> &&io)
|
||||
: coord_cli_(std::move(coord)), io_(std::move(io)) {}
|
||||
|
||||
@ -136,12 +145,12 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
}
|
||||
|
||||
// TODO(kostasrim) Simplify return result
|
||||
std::vector<ScanVerticesResponse> Request(ExecutionState<ScanVerticesRequest> &state) override {
|
||||
std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) override {
|
||||
MaybeInitializeExecutionState(state);
|
||||
std::vector<ScanVerticesResponse> responses;
|
||||
auto &shard_cacheref = state.shard_cache;
|
||||
auto &shard_cache_ref = state.shard_cache;
|
||||
size_t id = 0;
|
||||
for (auto shard_it = shard_cacheref.begin(); shard_it != shard_cacheref.end(); ++id) {
|
||||
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
|
||||
auto &storage_client = GetStorageClientForShard(*state.label, state.requests[id].start_id.primary_key);
|
||||
// TODO(kostasrim) Currently requests return the result directly. Adjust this when the API works MgFuture instead.
|
||||
auto read_response_result = storage_client.SendReadRequest(state.requests[id]);
|
||||
@ -150,26 +159,27 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
if (read_response_result.HasError()) {
|
||||
throw std::runtime_error("ScanAll request timedout");
|
||||
}
|
||||
if (read_response_result.GetValue().success == false) {
|
||||
auto &response = std::get<ScanVerticesResponse>(read_response_result.GetValue());
|
||||
if (response.success == false) {
|
||||
throw std::runtime_error("ScanAll request did not succeed");
|
||||
}
|
||||
responses.push_back(read_response_result.GetValue());
|
||||
if (!read_response_result.GetValue().next_start_id) {
|
||||
shard_it = shard_cacheref.erase(shard_it);
|
||||
if (!response.next_start_id) {
|
||||
shard_it = shard_cache_ref.erase(shard_it);
|
||||
} else {
|
||||
state.requests[id].start_id.primary_key = read_response_result.GetValue().next_start_id->primary_key;
|
||||
state.requests[id].start_id.primary_key = response.next_start_id->primary_key;
|
||||
++shard_it;
|
||||
}
|
||||
responses.push_back(std::move(response));
|
||||
}
|
||||
// We are done with this state
|
||||
MaybeCompleteState(state);
|
||||
// TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return
|
||||
// result of storage_client.SendReadRequest()).
|
||||
return responses;
|
||||
return PostProcess(std::move(responses));
|
||||
}
|
||||
|
||||
std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state,
|
||||
std::vector<NewVertexLabel> new_vertices) {
|
||||
std::vector<NewVertexLabel> new_vertices) override {
|
||||
MG_ASSERT(!new_vertices.empty());
|
||||
MaybeInitializeExecutionState(state, new_vertices);
|
||||
std::vector<CreateVerticesResponse> responses;
|
||||
@ -199,7 +209,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
return responses;
|
||||
}
|
||||
|
||||
std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) {
|
||||
std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) override {
|
||||
// TODO(kostasrim)Update to limit the batch size here
|
||||
// Expansions of the destination must be handled by the caller. For example
|
||||
// match (u:L1 { prop : 1 })-[:Friend]-(v:L1)
|
||||
@ -219,15 +229,23 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
if (read_response_result.HasError()) {
|
||||
throw std::runtime_error("ExpandOne request timedout");
|
||||
}
|
||||
if (read_response_result.GetValue().success == false) {
|
||||
throw std::runtime_error("ExpandOne request did not succeed");
|
||||
}
|
||||
responses.push_back(read_response_result.GetValue());
|
||||
auto &response = std::get<ExpandOneResponse>(read_response_result.GetValue());
|
||||
responses.push_back(std::move(response));
|
||||
}
|
||||
return responses;
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<VertexAccessor> PostProcess(std::vector<ScanVerticesResponse> &&responses) const {
|
||||
std::vector<VertexAccessor> accessors;
|
||||
for (auto &response : responses) {
|
||||
for (auto result_row : response.results) {
|
||||
accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props)));
|
||||
}
|
||||
}
|
||||
return accessors;
|
||||
}
|
||||
|
||||
template <typename ExecutionState>
|
||||
void ThrowIfStateCompleted(ExecutionState &state) const {
|
||||
if (state.state == ExecutionState::COMPLETED) [[unlikely]] {
|
||||
|
@ -14,7 +14,7 @@ function(add_simulation_test test_cpp san)
|
||||
# used to help create two targets of the same name even though CMake
|
||||
# requires unique logical target names
|
||||
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
|
||||
target_link_libraries(${target_name} gtest gmock mg-utils mg-io mg-io-simulator)
|
||||
target_link_libraries(${target_name} gtest gmock mg-utils mg-io mg-io-simulator mg-query-v2)
|
||||
|
||||
# sanitize
|
||||
target_compile_options(${target_name} PRIVATE -fsanitize=${san})
|
||||
|
@ -53,6 +53,8 @@ using memgraph::storage::v3::LabelId;
|
||||
using memgraph::storage::v3::PropertyValue;
|
||||
using requests::CreateVerticesRequest;
|
||||
using requests::CreateVerticesResponse;
|
||||
using requests::ExpandOneRequest;
|
||||
using requests::ExpandOneResponse;
|
||||
using requests::ListedValues;
|
||||
using requests::ScanVerticesRequest;
|
||||
using requests::ScanVerticesResponse;
|
||||
@ -78,29 +80,29 @@ class MockedShardRsm {
|
||||
public:
|
||||
// ExpandOneResponse Read(ExpandOneRequest rqst);
|
||||
// GetPropertiesResponse Read(GetPropertiesRequest rqst);
|
||||
ScanVerticesResponse Read(ScanVerticesRequest rqst) {
|
||||
ScanVerticesResponse ReadImpl(ScanVerticesRequest rqst) {
|
||||
ScanVerticesResponse ret;
|
||||
if (!IsKeyInRange(rqst.start_id.primary_key)) {
|
||||
ret.success = false;
|
||||
} else if (rqst.start_id.primary_key == ShardRsmKey{PropertyValue(0), PropertyValue(0)}) {
|
||||
Value val(int64_t(0));
|
||||
ListedValues listed_values;
|
||||
listed_values.properties.push_back(std::vector<Value>{val});
|
||||
ret.next_start_id = std::make_optional<VertexId>();
|
||||
ret.next_start_id->primary_key = ShardRsmKey{PropertyValue(1), PropertyValue(0)};
|
||||
ret.values = std::move(listed_values);
|
||||
requests::ScanResultRow result;
|
||||
result.props.insert(std::make_pair(requests::PropertyId::FromUint(0), val));
|
||||
ret.results.push_back(std::move(result));
|
||||
ret.success = true;
|
||||
} else if (rqst.start_id.primary_key == ShardRsmKey{PropertyValue(1), PropertyValue(0)}) {
|
||||
requests::ScanResultRow result;
|
||||
Value val(int64_t(1));
|
||||
ListedValues listed_values;
|
||||
listed_values.properties.push_back(std::vector<Value>{val});
|
||||
ret.values = std::move(listed_values);
|
||||
result.props.insert(std::make_pair(requests::PropertyId::FromUint(0), val));
|
||||
ret.results.push_back(std::move(result));
|
||||
ret.success = true;
|
||||
} else if (rqst.start_id.primary_key == ShardRsmKey{PropertyValue(12), PropertyValue(13)}) {
|
||||
requests::ScanResultRow result;
|
||||
Value val(int64_t(444));
|
||||
ListedValues listed_values;
|
||||
listed_values.properties.push_back(std::vector<Value>{val});
|
||||
ret.values = std::move(listed_values);
|
||||
result.props.insert(std::make_pair(requests::PropertyId::FromUint(0), val));
|
||||
ret.results.push_back(std::move(result));
|
||||
ret.success = true;
|
||||
} else {
|
||||
ret.success = false;
|
||||
@ -108,5 +110,14 @@ class MockedShardRsm {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ExpandOneResponse ReadImpl(ExpandOneRequest rqst) { return {}; }
|
||||
using ReadRequests = std::variant<ScanVerticesRequest, ExpandOneRequest>;
|
||||
using ReadResponses = std::variant<ScanVerticesResponse, ExpandOneResponse>;
|
||||
|
||||
ReadResponses Read(ReadRequests read_requests) {
|
||||
return {std::visit([this](auto &&request) { return ReadResponses{ReadImpl(std::move(request))}; },
|
||||
std::move(read_requests))};
|
||||
}
|
||||
|
||||
CreateVerticesResponse Apply(CreateVerticesRequest request) { return CreateVerticesResponse{.success = true}; }
|
||||
};
|
||||
|
@ -29,10 +29,10 @@
|
||||
#include "io/rsm/shard_rsm.hpp"
|
||||
#include "io/simulator/simulator.hpp"
|
||||
#include "io/simulator/simulator_transport.hpp"
|
||||
#include "query/v2/accessors.hpp"
|
||||
#include "query/v2/conversions.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "query/v2/shard_request_manager.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
@ -134,14 +134,17 @@ ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::co
|
||||
|
||||
} // namespace
|
||||
|
||||
using WriteRequests = CreateVerticesRequest;
|
||||
using WriteResponses = CreateVerticesResponse;
|
||||
using ReadRequests = std::variant<ScanVerticesRequest, ExpandOneRequest>;
|
||||
using ReadResponses = std::variant<ScanVerticesResponse, ExpandOneResponse>;
|
||||
|
||||
using ConcreteCoordinatorRsm = CoordinatorRsm<SimulatorTransport>;
|
||||
using ConcreteStorageRsm = Raft<SimulatorTransport, MockedShardRsm, CreateVerticesRequest, CreateVerticesResponse,
|
||||
ScanVerticesRequest, ScanVerticesResponse>;
|
||||
using ConcreteStorageRsm =
|
||||
Raft<SimulatorTransport, MockedShardRsm, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
|
||||
|
||||
template <typename IoImpl>
|
||||
void RunStorageRaft(Raft<IoImpl, MockedShardRsm, CreateVerticesRequest, CreateVerticesResponse, ScanVerticesRequest,
|
||||
ScanVerticesResponse>
|
||||
server) {
|
||||
void RunStorageRaft(Raft<IoImpl, MockedShardRsm, WriteRequests, WriteResponses, ReadRequests, ReadResponses> server) {
|
||||
server.Run();
|
||||
}
|
||||
|
||||
@ -149,20 +152,22 @@ template <typename ShardRequestManager>
|
||||
void TestScanAll(ShardRequestManager &io) {
|
||||
requests::ExecutionState<ScanVerticesRequest> state{.label = "test_label"};
|
||||
|
||||
std::cout << "Testing ScanALl" << std::endl;
|
||||
auto result = io.Request(state);
|
||||
std::cout << "Testing ScanALl" << std::endl;
|
||||
MG_ASSERT(result.size() == 2);
|
||||
{
|
||||
auto &list_of_values_1 = std::get<ListedValues>(result[0].values);
|
||||
MG_ASSERT(list_of_values_1.properties[0][0].int_v == 0);
|
||||
auto &list_of_values_2 = std::get<ListedValues>(result[1].values);
|
||||
MG_ASSERT(list_of_values_2.properties[0][0].int_v == 444);
|
||||
auto prop = result[0].GetProperty(requests::PropertyId::FromUint(0));
|
||||
MG_ASSERT(prop.int_v == 0);
|
||||
prop = result[1].GetProperty(requests::PropertyId::FromUint(0));
|
||||
MG_ASSERT(prop.int_v == 444);
|
||||
}
|
||||
|
||||
result = io.Request(state);
|
||||
{
|
||||
MG_ASSERT(result.size() == 1);
|
||||
auto &list_of_values_1 = std::get<ListedValues>(result[0].values);
|
||||
MG_ASSERT(list_of_values_1.properties[0][0].int_v == 1);
|
||||
auto prop = result[0].GetProperty(requests::PropertyId::FromUint(0));
|
||||
MG_ASSERT(prop.int_v == 1);
|
||||
}
|
||||
|
||||
// Exhaust it, request should be empty
|
||||
@ -172,6 +177,7 @@ void TestScanAll(ShardRequestManager &io) {
|
||||
|
||||
template <typename ShardRequestManager>
|
||||
void TestCreateVertices(ShardRequestManager &io) {
|
||||
std::cout << "Testing Create" << std::endl;
|
||||
using PropVal = memgraph::storage::v3::PropertyValue;
|
||||
requests::ExecutionState<CreateVerticesRequest> state;
|
||||
std::vector<NewVertexLabel> new_vertices;
|
||||
@ -292,8 +298,7 @@ int main() {
|
||||
// also get the current shard map
|
||||
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs);
|
||||
|
||||
requests::ShardRequestManager<SimulatorTransport, ScanVerticesRequest, ScanVerticesResponse> io(
|
||||
std::move(coordinator_client), std::move(cli_io));
|
||||
requests::ShardRequestManager<SimulatorTransport> io(std::move(coordinator_client), std::move(cli_io));
|
||||
|
||||
io.StartTransaction();
|
||||
TestScanAll(io);
|
||||
|
Loading…
Reference in New Issue
Block a user