Added TypedValue with new accessors and simplified requests
This commit is contained in:
parent
002e08b4f8
commit
2eaad15804
@ -116,14 +116,17 @@ class TypedValueT {
|
||||
return hash;
|
||||
}
|
||||
case TypedValueT::Type::Vertex:
|
||||
return value.ValueVertex().Gid().AsUint();
|
||||
return 34;
|
||||
// return value.ValueVertex().Gid().AsUint();
|
||||
case TypedValueT::Type::Edge:
|
||||
return value.ValueEdge().Gid().AsUint();
|
||||
return 35;
|
||||
// return value.ValueEdge().Gid().AsUint();
|
||||
case TypedValueT::Type::Path: {
|
||||
const auto &vertices = value.ValuePath().vertices();
|
||||
const auto &edges = value.ValuePath().edges();
|
||||
return utils::FnvCollection<decltype(vertices), TVertexAccessor>{}(vertices) ^
|
||||
utils::FnvCollection<decltype(edges), TEdgeAccessor>{}(edges);
|
||||
// const auto &vertices = value.ValuePath().vertices();
|
||||
// const auto &edges = value.ValuePath().edges();
|
||||
// return utils::FnvCollection<decltype(vertices), TVertexAccessor>{}(vertices) ^
|
||||
// utils::FnvCollection<decltype(edges), TEdgeAccessor>{}(edges);
|
||||
return 36;
|
||||
}
|
||||
case TypedValueT::Type::Date:
|
||||
return utils::DateHash{}(value.ValueDate());
|
||||
|
@ -33,7 +33,8 @@ set(mg_query_v2_sources
|
||||
stream/common.cpp
|
||||
trigger.cpp
|
||||
trigger_context.cpp
|
||||
bindings/typed_value.cpp)
|
||||
bindings/typed_value.cpp
|
||||
accessors.cpp)
|
||||
|
||||
find_package(Boost REQUIRED)
|
||||
|
||||
|
59
src/query/v2/accessors.cpp
Normal file
59
src/query/v2/accessors.cpp
Normal file
@ -0,0 +1,59 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "query/v2/accessors.hpp"
|
||||
|
||||
namespace memgraph::query::v2::accessors {
|
||||
EdgeAccessor::EdgeAccessor(Edge edge, std::map<std::string, Value> props)
|
||||
: edge(std::move(edge)), properties(std::move(props)) {}
|
||||
|
||||
std::string EdgeAccessor::EdgeType() const { return edge.type.name; }
|
||||
|
||||
std::map<std::string, Value> EdgeAccessor::Properties() const {
|
||||
return properties;
|
||||
// std::map<std::string, TypedValue> res;
|
||||
// for (const auto &[name, value] : *properties) {
|
||||
// res[name] = ValueToTypedValue(value);
|
||||
// }
|
||||
// return res;
|
||||
}
|
||||
|
||||
Value EdgeAccessor::GetProperty(const std::string &prop_name) {
|
||||
MG_ASSERT(properties.contains(prop_name));
|
||||
return properties[prop_name];
|
||||
}
|
||||
|
||||
VertexAccessor::VertexAccessor(Vertex v, std::map<std::string, Value> props)
|
||||
: vertex(std::move(v)), properties(std::move(props)) {}
|
||||
|
||||
std::vector<Label> VertexAccessor::Labels() const { return vertex.labels; }
|
||||
|
||||
bool VertexAccessor::HasLabel(Label &label) const {
|
||||
return std::find_if(vertex.labels.begin(), vertex.labels.end(),
|
||||
[label](const auto &l) { return l.id == label.id; }) != vertex.labels.end();
|
||||
}
|
||||
|
||||
std::map<std::string, Value> VertexAccessor::Properties() const {
|
||||
// std::map<std::string, TypedValue> res;
|
||||
// for (const auto &[name, value] : *properties) {
|
||||
// res[name] = ValueToTypedValue(value);
|
||||
// }
|
||||
// return res;
|
||||
return properties;
|
||||
}
|
||||
|
||||
Value VertexAccessor::GetProperty(const std::string &prop_name) {
|
||||
MG_ASSERT(properties.contains(prop_name));
|
||||
return Value(properties[prop_name]);
|
||||
// return ValueToTypedValue(properties[prop_name]);
|
||||
}
|
||||
|
||||
} // namespace memgraph::query::v2::accessors
|
@ -18,59 +18,33 @@
|
||||
#include "storage/v3/view.hpp"
|
||||
#include "utils/bound.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/memory.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
|
||||
TypedValue ValueToTypedValue(const Value &value) {
|
||||
switch (value.type) {
|
||||
case Value::NILL:
|
||||
return {};
|
||||
case Value::BOOL:
|
||||
return {value.bool_v};
|
||||
case Value::INT64:
|
||||
return {value.int_v};
|
||||
case Value::DOUBLE:
|
||||
return {value.double_v};
|
||||
case Value::STRING:
|
||||
return {value.string_v};
|
||||
case Value::LIST:
|
||||
return {value.list_v};
|
||||
case Value::MAP:
|
||||
return {value.map_v};
|
||||
case Value::VERTEX:
|
||||
case Value::EDGE:
|
||||
case Value::PATH:
|
||||
}
|
||||
std::runtime_error("Incorrect type in conversion");
|
||||
}
|
||||
namespace memgraph::query::v2::accessors {
|
||||
|
||||
using Value = requests::Value;
|
||||
using Edge = requests::Edge;
|
||||
using Vertex = requests::Vertex;
|
||||
using Label = requests::Label;
|
||||
|
||||
class VertexAccessor;
|
||||
|
||||
class EdgeAccessor final {
|
||||
public:
|
||||
EdgeAccessor(Edge *edge, std::map<std::string, Value> *props) : edge(edge), properties(props) {
|
||||
MG_ASSERT(edge != nullptr);
|
||||
MG_ASSERT(properties != nullptr);
|
||||
}
|
||||
EdgeAccessor(Edge edge, std::map<std::string, Value> props);
|
||||
|
||||
std::string EdgeType() const { return edge->type.name; }
|
||||
std::string EdgeType() const;
|
||||
|
||||
std::map<std::string, TypedValue> Properties() const {
|
||||
std::map<std::string, TypedValue> res;
|
||||
for (const auto &[name, value] : *properties) {
|
||||
res[name] = ValueToTypedValue(value);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
std::map<std::string, Value> Properties() const;
|
||||
|
||||
TypedValue GetProperty(const std::string &prop_name) const {
|
||||
MG_ASSERT(properties->contains(prop_name));
|
||||
return ValueToTypedValue(properties[prop_name]);
|
||||
}
|
||||
Value GetProperty(const std::string &prop_name);
|
||||
|
||||
// bool HasSrcAccessor const { return src == nullptr; }
|
||||
// bool HasDstAccessor const { return dst == nullptr; }
|
||||
|
||||
// VertexAccessor To() const;
|
||||
// VertexAccessor From() const;
|
||||
// VertexAccessor To() const;
|
||||
// VertexAccessor From() const;
|
||||
|
||||
friend bool operator==(const EdgeAccessor &lhs, const EdgeAccessor &rhs) noexcept {
|
||||
return lhs.edge == rhs.edge && lhs.properties == rhs.properties;
|
||||
@ -79,38 +53,23 @@ class EdgeAccessor final {
|
||||
friend bool operator!=(const EdgeAccessor &lhs, const EdgeAccessor &rhs) noexcept { return !(lhs == rhs); }
|
||||
|
||||
private:
|
||||
Edge *edge;
|
||||
Edge edge;
|
||||
// VertexAccessor *src {nullptr};
|
||||
// VertexAccessor *dst {nullptr};
|
||||
std::map<std::string, Value> *properties;
|
||||
std::map<std::string, Value> properties;
|
||||
};
|
||||
|
||||
class VertexAccessor final {
|
||||
public:
|
||||
VertexAccessor(Vertex *v, std::map<std::string, Value> *props) : vertex(v), properties(props) {
|
||||
MG_ASSERT(vertex != nullptr);
|
||||
MG_ASSERT(properties != nullptr);
|
||||
}
|
||||
VertexAccessor(Vertex v, std::map<std::string, Value> props);
|
||||
|
||||
std::vector<Label> Labels() const { return vertex->labels; }
|
||||
std::vector<Label> Labels() const;
|
||||
|
||||
bool HasLabel(Label &label) const {
|
||||
return std::find_if(vertex->labels.begin(), vertex->labels.end(),
|
||||
[label](const auto &l) { return l.id == label.id; }) != vertex->labels.end();
|
||||
}
|
||||
bool HasLabel(Label &label) const;
|
||||
|
||||
std::map<std::string, TypedValue> Properties() const {
|
||||
std::map<std::string, TypedValue> res;
|
||||
for (const auto &[name, value] : *properties) {
|
||||
res[name] = ValueToTypedValue(value);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
std::map<std::string, Value> Properties() const;
|
||||
|
||||
TypedValue GetProperty(const std::string &prop_name) const {
|
||||
MG_ASSERT(properties->contains(prop_name));
|
||||
return ValueToTypedValue(properties[prop_name]);
|
||||
}
|
||||
Value GetProperty(const std::string &prop_name);
|
||||
|
||||
// auto InEdges(storage::View view, const std::vector<storage::EdgeTypeId> &edge_types) const
|
||||
// -> storage::Result<decltype(iter::imap(MakeEdgeAccessor, *impl_.InEdges(view)))> {
|
||||
@ -158,10 +117,39 @@ class VertexAccessor final {
|
||||
friend bool operator!=(const VertexAccessor lhs, const VertexAccessor &rhs) noexcept { return !(lhs == rhs); }
|
||||
|
||||
private:
|
||||
Vertex *vertex;
|
||||
std::map<std::string, Value> *properties;
|
||||
Vertex vertex;
|
||||
std::map<std::string, Value> properties;
|
||||
};
|
||||
|
||||
//inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); }
|
||||
// inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); }
|
||||
|
||||
//inline VertexAccessor EdgeAccessor::From() const { return VertexAccessor(impl_.FromVertex()); }
|
||||
// inline VertexAccessor EdgeAccessor::From() const { return VertexAccessor(impl_.FromVertex()); }
|
||||
|
||||
// Highly mocked interface. Won't work if used.
|
||||
class Path {
|
||||
public:
|
||||
// Empty for now
|
||||
explicit Path(const VertexAccessor &vertex, utils::MemoryResource *memory = utils::NewDeleteResource())
|
||||
: mem(memory) {}
|
||||
|
||||
template <typename... TOthers>
|
||||
explicit Path(const VertexAccessor &vertex, const TOthers &...others) {}
|
||||
|
||||
template <typename... TOthers>
|
||||
Path(std::allocator_arg_t, utils::MemoryResource *memory, const VertexAccessor &vertex, const TOthers &...others) {}
|
||||
|
||||
Path(const Path &other) {}
|
||||
|
||||
Path(const Path &other, utils::MemoryResource *memory) : mem(memory) {}
|
||||
|
||||
Path(Path &&other) noexcept {}
|
||||
|
||||
Path(Path &&other, utils::MemoryResource *memory) : mem(memory) {}
|
||||
Path &operator=(const Path &path) { return *this; }
|
||||
friend bool operator==(const Path &lhs, const Path &rhs) { return true; };
|
||||
utils::MemoryResource *GetMemoryResource() { return mem; }
|
||||
|
||||
private:
|
||||
utils::MemoryResource *mem = utils::NewDeleteResource();
|
||||
};
|
||||
} // namespace memgraph::query::v2::accessors
|
||||
|
@ -10,10 +10,10 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "expr/typed_value.hpp"
|
||||
#include "query/v2/db_accessor.hpp"
|
||||
#include "query/v2/accessors.hpp"
|
||||
#include "query/v2/path.hpp"
|
||||
|
||||
namespace memgraph::expr {
|
||||
namespace v2 = memgraph::query::v2;
|
||||
template class TypedValueT<v2::VertexAccessor, v2::EdgeAccessor, v2::Path>;
|
||||
template class TypedValueT<v2::accessors::VertexAccessor, v2::accessors::EdgeAccessor, v2::accessors::Path>;
|
||||
} // namespace memgraph::expr
|
||||
|
@ -14,13 +14,14 @@
|
||||
#include "query/v2/bindings/bindings.hpp"
|
||||
|
||||
#include "expr/typed_value.hpp"
|
||||
#include "query/v2/db_accessor.hpp"
|
||||
#include "query/v2/path.hpp"
|
||||
#include "query/v2/accessors.hpp"
|
||||
|
||||
namespace memgraph::expr {
|
||||
namespace v2 = memgraph::query::v2;
|
||||
extern template class memgraph::expr::TypedValueT<v2::VertexAccessor, v2::EdgeAccessor, v2::Path>;
|
||||
extern template class memgraph::expr::TypedValueT<v2::accessors::VertexAccessor, v2::accessors::EdgeAccessor,
|
||||
v2::accessors::Path>;
|
||||
} // namespace memgraph::expr
|
||||
namespace memgraph::query::v2 {
|
||||
using TypedValue = memgraph::expr::TypedValueT<VertexAccessor, EdgeAccessor, Path>;
|
||||
using TypedValue =
|
||||
memgraph::expr::TypedValueT<v2::accessors::VertexAccessor, v2::accessors::EdgeAccessor, v2::accessors::Path>;
|
||||
} // namespace memgraph::query::v2
|
||||
|
47
src/query/v2/conversions.hpp
Normal file
47
src/query/v2/conversions.hpp
Normal file
@ -0,0 +1,47 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
#include "bindings/typed_value.hpp"
|
||||
#include "query/v2/accessors.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
|
||||
using requests::Value;
|
||||
|
||||
inline TypedValue ValueToTypedValue(const Value &value) {
|
||||
switch (value.type) {
|
||||
case Value::NILL:
|
||||
return {};
|
||||
case Value::BOOL:
|
||||
return TypedValue(value.bool_v);
|
||||
case Value::INT64:
|
||||
return TypedValue(value.int_v);
|
||||
case Value::DOUBLE:
|
||||
return TypedValue(value.double_v);
|
||||
case Value::STRING:
|
||||
return TypedValue(value.string_v);
|
||||
case Value::LIST:
|
||||
// return TypedValue(value.list_v);
|
||||
case Value::MAP:
|
||||
// return TypedValue(value.map_v);
|
||||
case Value::VERTEX:
|
||||
// return TypedValue(accessors::VertexAccessor(value.vertex_v, {}));
|
||||
case Value::EDGE:
|
||||
// return TypedValue(accessors::EdgeAccessor(value.edge_v, {}));
|
||||
case Value::PATH:
|
||||
break;
|
||||
}
|
||||
throw std::runtime_error("Incorrect type in conversion");
|
||||
}
|
||||
|
||||
} // namespace memgraph::query::v2
|
@ -26,18 +26,30 @@
|
||||
using memgraph::coordinator::Hlc;
|
||||
using memgraph::storage::v3::LabelId;
|
||||
|
||||
namespace requests {
|
||||
|
||||
struct Label {
|
||||
LabelId id;
|
||||
friend bool operator==(const Label &lhs, const Label &rhs) { return lhs.id == rhs.id; }
|
||||
};
|
||||
|
||||
// TODO(kostasrim) update this with CompoundKey, same for the rest of the file.
|
||||
using PrimaryKey = std::vector<memgraph::storage::v3::PropertyValue>;
|
||||
using VertexId = std::pair<Label, PrimaryKey>;
|
||||
|
||||
struct VertexId {
|
||||
Label primary_label;
|
||||
PrimaryKey primary_key;
|
||||
friend bool operator==(const VertexId &lhs, const VertexId &rhs) {
|
||||
return (lhs.primary_label == rhs.primary_label) && (lhs.primary_key == rhs.primary_key);
|
||||
}
|
||||
};
|
||||
|
||||
using Gid = size_t;
|
||||
using PropertyId = memgraph::storage::v3::PropertyId;
|
||||
|
||||
struct EdgeType {
|
||||
std::string name;
|
||||
friend bool operator==(const EdgeType &lhs, const EdgeType &rhs) = default;
|
||||
};
|
||||
|
||||
struct EdgeId {
|
||||
@ -48,12 +60,18 @@ struct EdgeId {
|
||||
struct Vertex {
|
||||
VertexId id;
|
||||
std::vector<Label> labels;
|
||||
friend bool operator==(const Vertex &lhs, const Vertex &rhs) {
|
||||
return (lhs.id == rhs.id) && (lhs.labels == rhs.labels);
|
||||
}
|
||||
};
|
||||
|
||||
struct Edge {
|
||||
VertexId src;
|
||||
VertexId dst;
|
||||
EdgeType type;
|
||||
friend bool operator==(const Edge &lhs, const Edge &rhs) {
|
||||
return (lhs.src == rhs.src) && (lhs.dst == rhs.dst) && (lhs.type == rhs.type);
|
||||
}
|
||||
};
|
||||
|
||||
struct PathPart {
|
||||
@ -73,7 +91,7 @@ struct Value {
|
||||
union {
|
||||
Null null_v;
|
||||
bool bool_v;
|
||||
int int_v;
|
||||
int64_t int_v;
|
||||
double double_v;
|
||||
std::string string_v;
|
||||
std::vector<Value> list_v;
|
||||
@ -85,7 +103,126 @@ struct Value {
|
||||
|
||||
Type type;
|
||||
|
||||
Value() : type(NILL), null_v{} {}
|
||||
Value() : null_v{}, type(NILL) {}
|
||||
|
||||
// copy ctor needed.
|
||||
Value(const Value &value) : type(value.type) {
|
||||
switch (value.type) {
|
||||
case NILL:
|
||||
null_v = {};
|
||||
break;
|
||||
case BOOL:
|
||||
bool_v = value.bool_v;
|
||||
break;
|
||||
case INT64:
|
||||
int_v = value.int_v;
|
||||
break;
|
||||
case DOUBLE:
|
||||
double_v = value.double_v;
|
||||
break;
|
||||
case STRING:
|
||||
string_v = value.string_v;
|
||||
break;
|
||||
case LIST:
|
||||
list_v = value.list_v;
|
||||
break;
|
||||
case MAP:
|
||||
map_v = value.map_v;
|
||||
break;
|
||||
case VERTEX:
|
||||
vertex_v = value.vertex_v;
|
||||
break;
|
||||
case EDGE:
|
||||
edge_v = value.edge_v;
|
||||
break;
|
||||
case PATH:
|
||||
path_v = value.path_v;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Value(Value &&other) noexcept {};
|
||||
|
||||
explicit Value(const bool val) : bool_v(val), type(BOOL){};
|
||||
explicit Value(const int64_t val) : int_v(val), type(INT64){};
|
||||
explicit Value(const double val) : double_v(val), type(DOUBLE){};
|
||||
explicit Value(const std::string &val) : string_v(val), type(STRING){};
|
||||
|
||||
explicit Value(std::vector<Value> &&val) : list_v(std::move(val)), type(LIST){};
|
||||
explicit Value(std::map<std::string, Value> &&val) : map_v(std::move(val)), type(MAP){};
|
||||
|
||||
explicit Value(const Vertex &val) : vertex_v(val), type(VERTEX){};
|
||||
explicit Value(const Edge &val) : edge_v(val), type(EDGE){};
|
||||
explicit Value(const Path &val) : path_v(val), type(PATH){};
|
||||
|
||||
Value &operator=(const Value &value) {
|
||||
if (&value == this) {
|
||||
return *this;
|
||||
}
|
||||
type = value.type;
|
||||
switch (value.type) {
|
||||
case NILL:
|
||||
null_v = {};
|
||||
break;
|
||||
case BOOL:
|
||||
bool_v = value.bool_v;
|
||||
break;
|
||||
case INT64:
|
||||
int_v = value.int_v;
|
||||
break;
|
||||
case DOUBLE:
|
||||
double_v = value.double_v;
|
||||
break;
|
||||
case STRING:
|
||||
string_v = value.string_v;
|
||||
break;
|
||||
case LIST:
|
||||
list_v = value.list_v;
|
||||
break;
|
||||
case MAP:
|
||||
map_v = value.map_v;
|
||||
break;
|
||||
case VERTEX:
|
||||
vertex_v = value.vertex_v;
|
||||
break;
|
||||
case EDGE:
|
||||
edge_v = value.edge_v;
|
||||
break;
|
||||
case PATH:
|
||||
path_v = value.path_v;
|
||||
break;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
friend bool operator==(const Value &lhs, const Value &rhs) {
|
||||
if (lhs.type != rhs.type) {
|
||||
return false;
|
||||
}
|
||||
switch (lhs.type) {
|
||||
case NILL:
|
||||
return true;
|
||||
case BOOL:
|
||||
return lhs.bool_v == rhs.bool_v;
|
||||
case INT64:
|
||||
return lhs.int_v == rhs.int_v;
|
||||
case DOUBLE:
|
||||
return lhs.double_v == rhs.double_v;
|
||||
case STRING:
|
||||
return lhs.string_v == rhs.string_v;
|
||||
case LIST:
|
||||
return lhs.list_v == rhs.list_v;
|
||||
case MAP:
|
||||
return lhs.map_v == rhs.map_v;
|
||||
case VERTEX:
|
||||
return lhs.vertex_v == rhs.vertex_v;
|
||||
case EDGE:
|
||||
return lhs.edge_v == rhs.edge_v;
|
||||
case PATH:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
~Value(){};
|
||||
};
|
||||
|
||||
struct ValuesMap {
|
||||
@ -230,3 +367,4 @@ using ReadResponses = std::variant<ExpandOneResponse, GetPropertiesResponse, Sca
|
||||
|
||||
using WriteRequests = CreateVerticesRequest;
|
||||
using WriteResponses = CreateVerticesResponse;
|
||||
} // namespace requests
|
||||
|
@ -37,6 +37,7 @@
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
namespace requests {
|
||||
template <typename TStorageClient>
|
||||
class RsmStorageClientManager {
|
||||
public:
|
||||
@ -111,6 +112,9 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
ShardRequestManager(CoordinatorClient coord, memgraph::io::Io<TTransport> &&io)
|
||||
: coord_cli_(std::move(coord)), io_(std::move(io)) {}
|
||||
|
||||
ShardRequestManager(const ShardRequestManager &) = delete;
|
||||
ShardRequestManager(ShardRequestManager &&) = delete;
|
||||
|
||||
~ShardRequestManager() override {}
|
||||
|
||||
void StartTransaction() override {
|
||||
@ -138,7 +142,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
auto &shard_cacheref = state.shard_cache;
|
||||
size_t id = 0;
|
||||
for (auto shard_it = shard_cacheref.begin(); shard_it != shard_cacheref.end(); ++id) {
|
||||
auto &storage_client = GetStorageClientForShard(*state.label, state.requests[id].start_id.second);
|
||||
auto &storage_client = GetStorageClientForShard(*state.label, state.requests[id].start_id.primary_key);
|
||||
// TODO(kostasrim) Currently requests return the result directly. Adjust this when the API works MgFuture instead.
|
||||
auto read_response_result = storage_client.SendReadRequest(state.requests[id]);
|
||||
// RETRY on timeouts?
|
||||
@ -153,7 +157,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
if (!read_response_result.GetValue().next_start_id) {
|
||||
shard_it = shard_cacheref.erase(shard_it);
|
||||
} else {
|
||||
state.requests[id].start_id.second = read_response_result.GetValue().next_start_id->second;
|
||||
state.requests[id].start_id.primary_key = read_response_result.GetValue().next_start_id->primary_key;
|
||||
++shard_it;
|
||||
}
|
||||
}
|
||||
@ -196,28 +200,29 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
}
|
||||
|
||||
std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) {
|
||||
MaybeInitializeExecutionState(state);
|
||||
std::vector<ExpandOneResponse> responses;
|
||||
auto &shard_cache_ref = state.shard_cache;
|
||||
size_t id = 0;
|
||||
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
|
||||
// This is fine because all new_vertices of each request end up on the same shard
|
||||
const Label label = state.requests[id].new_vertices[0].label_ids;
|
||||
auto primary_key = state.requests[id].new_vertices[0].primary_key;
|
||||
auto &storage_client = GetStorageClientForShard(*shard_it, label.id);
|
||||
auto read_response_result = storage_client.SendWriteRequest(state.requests[id]);
|
||||
// RETRY on timeouts?
|
||||
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test
|
||||
if (read_response_result.HasError()) {
|
||||
throw std::runtime_error("Write request error");
|
||||
}
|
||||
if (read_response_result.GetValue().success == false) {
|
||||
throw std::runtime_error("Write request did not succeed");
|
||||
}
|
||||
responses.push_back(read_response_result.GetValue());
|
||||
shard_it = shard_cache_ref.erase(shard_it);
|
||||
}
|
||||
return responses;
|
||||
throw std::runtime_error("Not yet implemented request");
|
||||
// MaybeInitializeExecutionState(state);
|
||||
// std::vector<ExpandOneResponse> responses;
|
||||
// auto &shard_cache_ref = state.shard_cache;
|
||||
// size_t id = 0;
|
||||
// for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
|
||||
// // This is fine because all new_vertices of each request end up on the same shard
|
||||
// const Label label = state.requests[id].new_vertices[0].label_ids;
|
||||
// auto primary_key = state.requests[id].new_vertices[0].primary_key;
|
||||
// auto &storage_client = GetStorageClientForShard(*shard_it, label.id);
|
||||
// auto read_response_result = storage_client.SendWriteRequest(state.requests[id]);
|
||||
// // RETRY on timeouts?
|
||||
// // Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map
|
||||
// test if (read_response_result.HasError()) {
|
||||
// throw std::runtime_error("Write request error");
|
||||
// }
|
||||
// if (read_response_result.GetValue().success == false) {
|
||||
// throw std::runtime_error("Write request did not succeed");
|
||||
// }
|
||||
// responses.push_back(read_response_result.GetValue());
|
||||
// shard_it = shard_cache_ref.erase(shard_it);
|
||||
// }
|
||||
// return responses;
|
||||
}
|
||||
|
||||
private:
|
||||
@ -280,7 +285,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
state.shard_cache.push_back(std::move(shard));
|
||||
ScanVerticesRequest rqst;
|
||||
rqst.transaction_id = transaction_id_;
|
||||
rqst.start_id.second = std::move(key);
|
||||
rqst.start_id.primary_key = std::move(key);
|
||||
state.requests.push_back(std::move(rqst));
|
||||
}
|
||||
state.state = ExecutionState<ScanVerticesRequest>::EXECUTING;
|
||||
@ -350,3 +355,4 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
memgraph::coordinator::Hlc transaction_id_;
|
||||
// TODO(kostasrim) Add batch prefetching
|
||||
};
|
||||
} // namespace requests
|
||||
|
@ -51,6 +51,13 @@ using memgraph::io::simulator::SimulatorStats;
|
||||
using memgraph::io::simulator::SimulatorTransport;
|
||||
using memgraph::storage::v3::LabelId;
|
||||
using memgraph::storage::v3::PropertyValue;
|
||||
using requests::CreateVerticesRequest;
|
||||
using requests::CreateVerticesResponse;
|
||||
using requests::ListedValues;
|
||||
using requests::ScanVerticesRequest;
|
||||
using requests::ScanVerticesResponse;
|
||||
using requests::Value;
|
||||
using requests::VertexId;
|
||||
|
||||
using ShardRsmKey = std::vector<memgraph::storage::v3::PropertyValue>;
|
||||
|
||||
@ -73,24 +80,24 @@ class MockedShardRsm {
|
||||
// GetPropertiesResponse Read(GetPropertiesRequest rqst);
|
||||
ScanVerticesResponse Read(ScanVerticesRequest rqst) {
|
||||
ScanVerticesResponse ret;
|
||||
if (!IsKeyInRange(rqst.start_id.second)) {
|
||||
if (!IsKeyInRange(rqst.start_id.primary_key)) {
|
||||
ret.success = false;
|
||||
} else if (rqst.start_id.second == ShardRsmKey{PropertyValue(0), PropertyValue(0)}) {
|
||||
Value val{.int_v = 0, .type = Value::Type::INT64};
|
||||
} else if (rqst.start_id.primary_key == ShardRsmKey{PropertyValue(0), PropertyValue(0)}) {
|
||||
Value val(int64_t(0));
|
||||
ListedValues listed_values;
|
||||
listed_values.properties.push_back(std::vector<Value>{val});
|
||||
ret.next_start_id = std::make_optional<VertexId>();
|
||||
ret.next_start_id->second = ShardRsmKey{PropertyValue(1), PropertyValue(0)};
|
||||
ret.next_start_id->primary_key = ShardRsmKey{PropertyValue(1), PropertyValue(0)};
|
||||
ret.values = std::move(listed_values);
|
||||
ret.success = true;
|
||||
} else if (rqst.start_id.second == ShardRsmKey{PropertyValue(1), PropertyValue(0)}) {
|
||||
Value val{.int_v = 1, .type = Value::Type::INT64};
|
||||
} else if (rqst.start_id.primary_key == ShardRsmKey{PropertyValue(1), PropertyValue(0)}) {
|
||||
Value val(int64_t(1));
|
||||
ListedValues listed_values;
|
||||
listed_values.properties.push_back(std::vector<Value>{val});
|
||||
ret.values = std::move(listed_values);
|
||||
ret.success = true;
|
||||
} else if (rqst.start_id.second == ShardRsmKey{PropertyValue(12), PropertyValue(13)}) {
|
||||
Value val{.int_v = 444, .type = Value::Type::INT64};
|
||||
} else if (rqst.start_id.primary_key == ShardRsmKey{PropertyValue(12), PropertyValue(13)}) {
|
||||
Value val(int64_t(444));
|
||||
ListedValues listed_values;
|
||||
listed_values.properties.push_back(std::vector<Value>{val});
|
||||
ret.values = std::move(listed_values);
|
||||
|
@ -29,6 +29,7 @@
|
||||
#include "io/rsm/shard_rsm.hpp"
|
||||
#include "io/simulator/simulator.hpp"
|
||||
#include "io/simulator/simulator_transport.hpp"
|
||||
#include "query/v2/conversions.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "query/v2/shard_request_manager.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
@ -55,7 +56,6 @@ using memgraph::io::TimedOut;
|
||||
using memgraph::io::rsm::Raft;
|
||||
using memgraph::io::rsm::ReadRequest;
|
||||
using memgraph::io::rsm::ReadResponse;
|
||||
using memgraph::io::rsm::RsmClient;
|
||||
using memgraph::io::rsm::StorageReadRequest;
|
||||
using memgraph::io::rsm::StorageReadResponse;
|
||||
using memgraph::io::rsm::StorageWriteRequest;
|
||||
@ -69,9 +69,12 @@ using memgraph::io::simulator::SimulatorTransport;
|
||||
using memgraph::storage::v3::LabelId;
|
||||
using memgraph::storage::v3::SchemaProperty;
|
||||
using memgraph::utils::BasicResult;
|
||||
|
||||
using ShardClient =
|
||||
RsmClient<SimulatorTransport, StorageWriteRequest, StorageWriteResponse, ScanVerticesRequest, ScanVerticesResponse>;
|
||||
using requests::CreateVerticesRequest;
|
||||
using requests::CreateVerticesResponse;
|
||||
using requests::ListedValues;
|
||||
using requests::NewVertexLabel;
|
||||
using requests::ScanVerticesRequest;
|
||||
using requests::ScanVerticesResponse;
|
||||
|
||||
namespace {
|
||||
|
||||
@ -144,7 +147,7 @@ void RunStorageRaft(Raft<IoImpl, MockedShardRsm, CreateVerticesRequest, CreateVe
|
||||
|
||||
template <typename ShardRequestManager>
|
||||
void TestScanAll(ShardRequestManager &io) {
|
||||
ExecutionState<ScanVerticesRequest> state{.label = "test_label"};
|
||||
requests::ExecutionState<ScanVerticesRequest> state{.label = "test_label"};
|
||||
|
||||
auto result = io.Request(state);
|
||||
MG_ASSERT(result.size() == 2);
|
||||
@ -170,7 +173,7 @@ void TestScanAll(ShardRequestManager &io) {
|
||||
template <typename ShardRequestManager>
|
||||
void TestCreateVertices(ShardRequestManager &io) {
|
||||
using PropVal = memgraph::storage::v3::PropertyValue;
|
||||
ExecutionState<CreateVerticesRequest> state;
|
||||
requests::ExecutionState<CreateVerticesRequest> state;
|
||||
std::vector<NewVertexLabel> new_vertices;
|
||||
NewVertexLabel a1{.label = "test_label", .primary_key = {PropVal(1), PropVal(0)}};
|
||||
NewVertexLabel a2{.label = "test_label", .primary_key = {PropVal(13), PropVal(13)}};
|
||||
@ -289,8 +292,8 @@ int main() {
|
||||
// also get the current shard map
|
||||
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs);
|
||||
|
||||
ShardRequestManager<SimulatorTransport, ScanVerticesRequest, ScanVerticesResponse> io(std::move(coordinator_client),
|
||||
std::move(cli_io));
|
||||
requests::ShardRequestManager<SimulatorTransport, ScanVerticesRequest, ScanVerticesResponse> io(
|
||||
std::move(coordinator_client), std::move(cli_io));
|
||||
|
||||
io.StartTransaction();
|
||||
TestScanAll(io);
|
||||
|
Loading…
Reference in New Issue
Block a user