Make ExpandOne work in the query engine (#589)

This commit is contained in:
János Benjamin Antal 2022-10-20 11:35:00 +02:00 committed by GitHub
parent 85b8ce9101
commit f89a2bbf42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 884 additions and 697 deletions

View File

@ -208,7 +208,13 @@ struct ShardMap {
// Find a random place for the server to plug in
}
LabelId GetLabelId(const std::string &label) const { return labels.at(label); }
std::optional<LabelId> GetLabelId(const std::string &label) const {
if (const auto it = labels.find(label); it != labels.end()) {
return it->second;
}
return std::nullopt;
}
std::string GetLabelName(const LabelId label) const {
if (const auto it =
@ -220,8 +226,8 @@ struct ShardMap {
}
std::optional<PropertyId> GetPropertyId(const std::string &property_name) const {
if (properties.contains(property_name)) {
return properties.at(property_name);
if (const auto it = properties.find(property_name); it != properties.end()) {
return it->second;
}
return std::nullopt;
@ -237,8 +243,8 @@ struct ShardMap {
}
std::optional<EdgeTypeId> GetEdgeTypeId(const std::string &edge_type) const {
if (edge_types.contains(edge_type)) {
return edge_types.at(edge_type);
if (const auto it = edge_types.find(edge_type); it != edge_types.end()) {
return it->second;
}
return std::nullopt;

View File

@ -205,6 +205,7 @@ Value ToBoltValue(msgs::Value value) {
case msgs::Value::Type::Edge: {
throw utils::BasicException("Vertex and Edge not supported!");
}
// TODO Value to Date types not supported
}
}

View File

@ -643,10 +643,10 @@ int main(int argc, char **argv) {
const std::string label{"label"};
memgraph::coordinator::ShardMap sm;
auto prop_map = sm.AllocatePropertyIds(std::vector<std::string>{property});
auto edge_type_map = sm.AllocateEdgeTypeIds(std::vector<std::string>{"edge_type"});
auto edge_type_map = sm.AllocateEdgeTypeIds(std::vector<std::string>{"TO"});
std::vector<memgraph::storage::v3::SchemaProperty> schema{{prop_map.at(property), memgraph::common::SchemaType::INT}};
sm.InitializeNewLabel(label, schema, 1, sm.shard_map_version);
sm.SplitShard(sm.GetHlc(), sm.GetLabelId(label),
sm.SplitShard(sm.GetHlc(), *sm.GetLabelId(label),
std::vector<memgraph::storage::v3::PropertyValue>{memgraph::storage::v3::PropertyValue{2}});
memgraph::coordinator::Coordinator coordinator{sm};

View File

@ -14,13 +14,12 @@
#include "storage/v3/id_types.hpp"
namespace memgraph::query::v2::accessors {
EdgeAccessor::EdgeAccessor(Edge edge, std::vector<std::pair<PropertyId, Value>> props)
: edge(std::move(edge)), properties(std::move(props)) {}
EdgeAccessor::EdgeAccessor(Edge edge) : edge(std::move(edge)) {}
EdgeTypeId EdgeAccessor::EdgeType() const { return EdgeTypeId::FromUint(edge.type.id); }
EdgeTypeId EdgeAccessor::EdgeType() const { return edge.type.id; }
std::vector<std::pair<PropertyId, Value>> EdgeAccessor::Properties() const {
return properties;
const std::vector<std::pair<PropertyId, Value>> &EdgeAccessor::Properties() const {
return edge.properties;
// std::map<std::string, TypedValue> res;
// for (const auto &[name, value] : *properties) {
// res[name] = ValueToTypedValue(value);
@ -34,7 +33,9 @@ Value EdgeAccessor::GetProperty(const std::string & /*prop_name*/) const {
return {};
}
Edge EdgeAccessor::GetEdge() const { return edge; }
const Edge &EdgeAccessor::GetEdge() const { return edge; }
bool EdgeAccessor::IsCycle() const { return edge.src == edge.dst; };
VertexAccessor EdgeAccessor::To() const { return VertexAccessor(Vertex{edge.dst}, {}); }
@ -45,6 +46,8 @@ VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value
Label VertexAccessor::PrimaryLabel() const { return vertex.id.first; }
const msgs::VertexId &VertexAccessor::Id() const { return vertex.id; }
std::vector<Label> VertexAccessor::Labels() const { return vertex.labels; }
bool VertexAccessor::HasLabel(Label &label) const {
@ -52,14 +55,7 @@ bool VertexAccessor::HasLabel(Label &label) const {
[label](const auto &l) { return l.id == label.id; }) != vertex.labels.end();
}
std::vector<std::pair<PropertyId, Value>> VertexAccessor::Properties() const {
// std::map<std::string, TypedValue> res;
// for (const auto &[name, value] : *properties) {
// res[name] = ValueToTypedValue(value);
// }
// return res;
return properties;
}
const std::vector<std::pair<PropertyId, Value>> &VertexAccessor::Properties() const { return properties; }
Value VertexAccessor::GetProperty(PropertyId prop_id) const {
return std::find_if(properties.begin(), properties.end(), [&](auto &pr) { return prop_id == pr.first; })->second;

View File

@ -36,59 +36,61 @@ class VertexAccessor;
class EdgeAccessor final {
public:
EdgeAccessor(Edge edge, std::vector<std::pair<PropertyId, Value>> props);
explicit EdgeAccessor(Edge edge);
EdgeTypeId EdgeType() const;
[[nodiscard]] EdgeTypeId EdgeType() const;
std::vector<std::pair<PropertyId, Value>> Properties() const;
[[nodiscard]] const std::vector<std::pair<PropertyId, Value>> &Properties() const;
Value GetProperty(const std::string &prop_name) const;
[[nodiscard]] Value GetProperty(const std::string &prop_name) const;
Edge GetEdge() const;
[[nodiscard]] const Edge &GetEdge() const;
[[nodiscard]] bool IsCycle() const;
// Dummy function
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
inline size_t CypherId() const { return 10; }
[[nodiscard]] size_t CypherId() const { return 10; }
// bool HasSrcAccessor const { return src == nullptr; }
// bool HasDstAccessor const { return dst == nullptr; }
VertexAccessor To() const;
VertexAccessor From() const;
[[nodiscard]] VertexAccessor To() const;
[[nodiscard]] VertexAccessor From() const;
friend bool operator==(const EdgeAccessor &lhs, const EdgeAccessor &rhs) {
return lhs.edge == rhs.edge && lhs.properties == rhs.properties;
}
friend bool operator==(const EdgeAccessor &lhs, const EdgeAccessor &rhs) { return lhs.edge == rhs.edge; }
friend bool operator!=(const EdgeAccessor &lhs, const EdgeAccessor &rhs) { return !(lhs == rhs); }
private:
Edge edge;
std::vector<std::pair<PropertyId, Value>> properties;
};
class VertexAccessor final {
public:
using PropertyId = msgs::PropertyId;
using Label = msgs::Label;
using VertexId = msgs::VertexId;
VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props);
Label PrimaryLabel() const;
[[nodiscard]] Label PrimaryLabel() const;
std::vector<Label> Labels() const;
[[nodiscard]] const msgs::VertexId &Id() const;
bool HasLabel(Label &label) const;
[[nodiscard]] std::vector<Label> Labels() const;
std::vector<std::pair<PropertyId, Value>> Properties() const;
[[nodiscard]] bool HasLabel(Label &label) const;
Value GetProperty(PropertyId prop_id) const;
Value GetProperty(const std::string &prop_name) const;
[[nodiscard]] const std::vector<std::pair<PropertyId, Value>> &Properties() const;
msgs::Vertex GetVertex() const;
[[nodiscard]] Value GetProperty(PropertyId prop_id) const;
[[nodiscard]] Value GetProperty(const std::string &prop_name) const;
[[nodiscard]] msgs::Vertex GetVertex() const;
// Dummy function
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
inline size_t CypherId() const { return 10; }
[[nodiscard]] size_t CypherId() const { return 10; }
// auto InEdges(storage::View view, const std::vector<storage::EdgeTypeId> &edge_types) const
// -> storage::Result<decltype(iter::imap(MakeEdgeAccessor, *impl_.InEdges(view)))> {

View File

@ -80,7 +80,7 @@ inline std::vector<storage::v3::LabelId> NamesToLabels(const std::vector<std::st
// TODO Fix by using reference
if (shard_request_manager != nullptr) {
for (const auto &name : label_names) {
labels.push_back(shard_request_manager->LabelNameToLabelId(name));
labels.push_back(shard_request_manager->NameToLabel(name));
}
}
return labels;

View File

@ -49,7 +49,7 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value) {
case Value::Type::Vertex:
return TypedValue(accessors::VertexAccessor(value.vertex_v, {}));
case Value::Type::Edge:
return TypedValue(accessors::EdgeAccessor(value.edge_v, {}));
return TypedValue(accessors::EdgeAccessor(value.edge_v));
}
throw std::runtime_error("Incorrect type in conversion");
}

View File

@ -696,6 +696,7 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
ctx_.is_shutting_down = &interpreter_context->is_shutting_down;
ctx_.is_profile_query = is_profile_query;
ctx_.shard_request_manager = shard_request_manager;
ctx_.edge_ids_alloc = interpreter_context->edge_ids_alloc;
}
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n,

View File

@ -424,6 +424,8 @@ ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_sy
ACCEPT_WITH_INPUT(ScanAll)
class DistributedScanAllCursor;
UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ScanAllOperator);
@ -562,10 +564,12 @@ Expand::Expand(const std::shared_ptr<LogicalOperator> &input, Symbol input_symbo
ACCEPT_WITH_INPUT(Expand)
class DistributedExpandCursor;
UniqueCursorPtr Expand::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ExpandOperator);
return MakeUniqueCursorPtr<ExpandCursor>(mem, *this, mem);
return MakeUniqueCursorPtr<DistributedExpandCursor>(mem, *this, mem);
}
std::vector<Symbol> Expand::ModifiedSymbols(const SymbolTable &table) const {
@ -2387,7 +2391,7 @@ class DistributedCreateExpandCursor : public Cursor {
msgs::NewExpand request{.id = {context.edge_ids_alloc.AllocateId()}};
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr,
storage::v3::View::NEW);
request.type = {edge_info.edge_type.AsUint()};
request.type = {edge_info.edge_type};
if (const auto *edge_info_properties = std::get_if<PropertiesMapList>(&edge_info.properties)) {
for (const auto &[property, value_expression] : *edge_info_properties) {
TypedValue val = value_expression->Accept(evaluator);
@ -2447,4 +2451,156 @@ class DistributedCreateExpandCursor : public Cursor {
msgs::ExecutionState<msgs::CreateExpandRequest> state_;
};
class DistributedExpandCursor : public Cursor {
public:
explicit DistributedExpandCursor(const Expand &self, utils::MemoryResource *mem)
: self_(self),
input_cursor_(self.input_->MakeCursor(mem)),
current_in_edge_it_(current_in_edges_.begin()),
current_out_edge_it_(current_out_edges_.begin()) {
if (self_.common_.existing_node) {
throw QueryRuntimeException("Cannot use existing node with DistributedExpandOne cursor!");
}
}
using VertexAccessor = accessors::VertexAccessor;
using EdgeAccessor = accessors::EdgeAccessor;
bool InitEdges(Frame &frame, ExecutionContext &context) {
// Input Vertex could be null if it is created by a failed optional match. In
// those cases we skip that input pull and continue with the next.
while (true) {
if (!input_cursor_->Pull(frame, context)) return false;
TypedValue &vertex_value = frame[self_.input_symbol_];
// Null check due to possible failed optional match.
if (vertex_value.IsNull()) continue;
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
auto &vertex = vertex_value.ValueVertex();
static constexpr auto direction_to_msgs_direction = [](const EdgeAtom::Direction direction) {
switch (direction) {
case EdgeAtom::Direction::IN:
return msgs::EdgeDirection::IN;
case EdgeAtom::Direction::OUT:
return msgs::EdgeDirection::OUT;
case EdgeAtom::Direction::BOTH:
return msgs::EdgeDirection::BOTH;
}
};
msgs::ExpandOneRequest request;
request.direction = direction_to_msgs_direction(self_.common_.direction);
// to not fetch any properties of the edges
request.edge_properties.emplace();
request.src_vertices.push_back(vertex.Id());
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
MG_ASSERT(result_rows.size() == 1);
auto &result_row = result_rows.front();
const auto convert_edges = [&vertex](
std::vector<msgs::ExpandOneResultRow::EdgeWithSpecificProperties> &&edge_messages,
const EdgeAtom::Direction direction) {
std::vector<EdgeAccessor> edge_accessors;
edge_accessors.reserve(edge_messages.size());
switch (direction) {
case EdgeAtom::Direction::IN: {
for (auto &edge : edge_messages) {
edge_accessors.emplace_back(
msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type});
}
break;
}
case EdgeAtom::Direction::OUT: {
for (auto &edge : edge_messages) {
edge_accessors.emplace_back(
msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type});
}
break;
}
case EdgeAtom::Direction::BOTH: {
LOG_FATAL("Must indicate exact expansion direction here");
}
}
return edge_accessors;
};
current_in_edges_ =
convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::IN);
current_in_edge_it_ = current_in_edges_.begin();
current_in_edges_ =
convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::OUT);
current_in_edge_it_ = current_in_edges_.begin();
return true;
}
}
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("DistributedExpand");
// A helper function for expanding a node from an edge.
auto pull_node = [this, &frame](const EdgeAccessor &new_edge, EdgeAtom::Direction direction) {
if (self_.common_.existing_node) return;
switch (direction) {
case EdgeAtom::Direction::IN:
frame[self_.common_.node_symbol] = new_edge.From();
break;
case EdgeAtom::Direction::OUT:
frame[self_.common_.node_symbol] = new_edge.To();
break;
case EdgeAtom::Direction::BOTH:
LOG_FATAL("Must indicate exact expansion direction here");
}
};
while (true) {
if (MustAbort(context)) throw HintedAbortError();
// attempt to get a value from the incoming edges
if (current_in_edge_it_ != current_in_edges_.end()) {
auto &edge = *current_in_edge_it_;
++current_in_edge_it_;
frame[self_.common_.edge_symbol] = edge;
pull_node(edge, EdgeAtom::Direction::IN);
return true;
}
// attempt to get a value from the outgoing edges
if (current_out_edge_it_ != current_out_edges_.end()) {
auto &edge = *current_out_edge_it_;
++current_out_edge_it_;
if (self_.common_.direction == EdgeAtom::Direction::BOTH && edge.IsCycle()) {
continue;
};
frame[self_.common_.edge_symbol] = edge;
pull_node(edge, EdgeAtom::Direction::OUT);
return true;
}
// If we are here, either the edges have not been initialized,
// or they have been exhausted. Attempt to initialize the edges.
if (!InitEdges(frame, context)) return false;
// we have re-initialized the edges, continue with the loop
}
}
void Shutdown() override { input_cursor_->Shutdown(); }
void Reset() override {
input_cursor_->Reset();
current_in_edges_.clear();
current_out_edges_.clear();
current_in_edge_it_ = current_in_edges_.end();
current_out_edge_it_ = current_out_edges_.end();
}
private:
const Expand &self_;
const UniqueCursorPtr input_cursor_;
std::vector<EdgeAccessor> current_in_edges_;
std::vector<EdgeAccessor> current_out_edges_;
std::vector<EdgeAccessor>::iterator current_in_edge_it_;
std::vector<EdgeAccessor>::iterator current_out_edge_it_;
};
} // namespace memgraph::query::v2::plan

View File

@ -31,7 +31,7 @@ class VertexCountCache {
public:
explicit VertexCountCache(TDbAccessor *shard_request_manager) : shard_request_manager_{shard_request_manager} {}
auto NameToLabel(const std::string &name) { return shard_request_manager_->LabelNameToLabelId(name); }
auto NameToLabel(const std::string &name) { return shard_request_manager_->NameToLabel(name); }
auto NameToProperty(const std::string &name) { return shard_request_manager_->NameToProperty(name); }
auto NameToEdgeType(const std::string &name) { return shard_request_manager_->NameToEdgeType(name); }

View File

@ -50,7 +50,7 @@ using PropertyId = memgraph::storage::v3::PropertyId;
using EdgeTypeId = memgraph::storage::v3::EdgeTypeId;
struct EdgeType {
uint64_t id;
EdgeTypeId id;
friend bool operator==(const EdgeType &lhs, const EdgeType &rhs) = default;
};
@ -64,7 +64,7 @@ struct EdgeId {
struct Edge {
VertexId src;
VertexId dst;
std::optional<std::vector<std::pair<PropertyId, Value>>> properties;
std::vector<std::pair<PropertyId, Value>> properties;
EdgeId id;
EdgeType type;
friend bool operator==(const Edge &lhs, const Edge &rhs) { return lhs.id == rhs.id; }
@ -317,20 +317,6 @@ struct Value {
}
};
struct ValuesMap {
std::unordered_map<PropertyId, Value> values_map;
};
struct MappedValues {
std::vector<ValuesMap> values_map;
};
struct ListedValues {
std::vector<std::vector<Value>> properties;
};
using Values = std::variant<ListedValues, MappedValues>;
struct Expression {
std::string expression;
};
@ -350,7 +336,9 @@ enum class StorageView { OLD = 0, NEW = 1 };
struct ScanVerticesRequest {
Hlc transaction_id;
// This should be optional
VertexId start_id;
// The empty optional means return all of the properties, while an empty list means do not return any properties
std::optional<std::vector<PropertyId>> props_to_return;
// expression that determines if vertex is returned or not
std::vector<std::string> filter_expressions;
@ -366,6 +354,7 @@ struct ScanVerticesRequest {
struct ScanResultRow {
Vertex vertex;
// empty() is no properties returned
// This should be changed to std::map<PropertyId, Value>
std::vector<std::pair<PropertyId, Value>> props;
std::vector<Value> evaluated_vertex_expressions;
};
@ -380,6 +369,7 @@ using VertexOrEdgeIds = std::variant<VertexId, EdgeId>;
struct GetPropertiesRequest {
Hlc transaction_id;
// Shouldn't contain mixed vertex and edge ids
VertexOrEdgeIds vertex_or_edge_ids;
std::vector<PropertyId> property_ids;
std::vector<Expression> expressions;
@ -391,44 +381,48 @@ struct GetPropertiesRequest {
struct GetPropertiesResponse {
bool success;
Values values;
};
enum class EdgeDirection : uint8_t { OUT = 1, IN = 2, BOTH = 3 };
struct VertexEdgeId {
VertexId vertex_id;
std::optional<EdgeId> next_id;
};
struct ExpandOneRequest {
// TODO(antaljanosbenjamin): Filtering based on the id of the other end of the edge?
Hlc transaction_id;
std::vector<VertexId> src_vertices;
// return types that type is in this list
// empty means all the types
std::vector<EdgeType> edge_types;
EdgeDirection direction;
EdgeDirection direction{EdgeDirection::OUT};
// Wether to return multiple edges between the same neighbors
bool only_unique_neighbor_rows = false;
// The empty optional means return all of the properties, while an empty
// list means do not return any properties
// TODO(antaljanosbenjamin): All of the special values should be communicated through a single vertex object
// after schema is implemented
// Special values are accepted:
// * __mg__labels
// The empty optional means return all of the properties, while an empty list means do not return any properties
std::optional<std::vector<PropertyId>> src_vertex_properties;
// TODO(antaljanosbenjamin): All of the special values should be communicated through a single vertex object
// after schema is implemented
// Special values are accepted:
// * __mg__dst_id (Vertex, but without labels)
// * __mg__type (binary)
// The empty optional means return all of the properties, while an empty list means do not return any properties
std::optional<std::vector<PropertyId>> edge_properties;
// QUESTION(antaljanosbenjamin): Maybe also add possibility to expressions evaluated on the source vertex?
// List of expressions evaluated on edges
std::vector<Expression> expressions;
std::optional<std::vector<OrderBy>> order_by;
// Limit the edges or the vertices?
std::optional<size_t> limit;
std::optional<Filter> filter;
};
struct ExpandOneResultRow {
struct EdgeWithAllProperties {
VertexId other_end;
EdgeType type;
Gid gid;
std::map<PropertyId, Value> properties;
};
struct EdgeWithSpecificProperties {
VertexId other_end;
EdgeType type;
Gid gid;
std::vector<Value> properties;
};
// NOTE: This struct could be a single Values with columns something like this:
// src_vertex(Vertex), vertex_prop1(Value), vertex_prop2(Value), edges(list<Value>)
// where edges might be a list of:
@ -437,15 +431,17 @@ struct ExpandOneResultRow {
// The drawback of this is currently the key of the map is always interpreted as a string in Value, not as an
// integer, which should be in case of mapped properties.
Vertex src_vertex;
std::optional<std::map<PropertyId, Value>> src_vertex_properties;
std::map<PropertyId, Value> src_vertex_properties;
// NOTE: If the desired edges are specified in the request,
// edges_with_specific_properties will have a value and it will
// return the properties as a vector of property values. The order
// of the values returned should be the same as the PropertyIds
// were defined in the request.
std::optional<std::vector<std::tuple<VertexId, Gid, std::map<PropertyId, Value>>>> edges_with_all_properties;
std::optional<std::vector<std::tuple<VertexId, Gid, std::vector<Value>>>> edges_with_specific_properties;
std::vector<EdgeWithAllProperties> in_edges_with_all_properties;
std::vector<EdgeWithSpecificProperties> in_edges_with_specific_properties;
std::vector<EdgeWithAllProperties> out_edges_with_all_properties;
std::vector<EdgeWithSpecificProperties> out_edges_with_specific_properties;
};
struct ExpandOneResponse {
@ -455,6 +451,7 @@ struct ExpandOneResponse {
struct UpdateVertexProp {
PrimaryKey primary_key;
// This should be a map
std::vector<std::pair<PropertyId, Value>> property_updates;
};
@ -462,6 +459,7 @@ struct UpdateEdgeProp {
EdgeId edge_id;
VertexId src;
VertexId dst;
// This should be a map
std::vector<std::pair<PropertyId, Value>> property_updates;
};
@ -471,12 +469,7 @@ struct UpdateEdgeProp {
struct NewVertex {
std::vector<Label> label_ids;
PrimaryKey primary_key;
std::vector<std::pair<PropertyId, Value>> properties;
};
struct NewVertexLabel {
std::string label;
PrimaryKey primary_key;
// This should be a map
std::vector<std::pair<PropertyId, Value>> properties;
};

View File

@ -14,7 +14,9 @@
#include <chrono>
#include <deque>
#include <iostream>
#include <iterator>
#include <map>
#include <numeric>
#include <optional>
#include <random>
#include <set>
@ -116,14 +118,14 @@ class ShardRequestManagerInterface {
virtual std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) = 0;
virtual std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state,
std::vector<NewVertex> new_vertices) = 0;
virtual std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) = 0;
virtual std::vector<ExpandOneResultRow> Request(ExecutionState<ExpandOneRequest> &state,
ExpandOneRequest request) = 0;
virtual std::vector<CreateExpandResponse> Request(ExecutionState<CreateExpandRequest> &state,
std::vector<NewExpand> new_edges) = 0;
// TODO(antaljanosbenjamin): unify the GetXXXId and NameToId functions to have consistent naming, return type and
// implementation
virtual storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const = 0;
virtual storage::v3::PropertyId NameToProperty(const std::string &name) const = 0;
virtual storage::v3::LabelId LabelNameToLabelId(const std::string &name) const = 0;
virtual storage::v3::LabelId NameToLabel(const std::string &name) const = 0;
virtual const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const = 0;
virtual const std::string &LabelToName(memgraph::storage::v3::LabelId label) const = 0;
virtual const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const = 0;
@ -209,15 +211,15 @@ class ShardRequestManager : public ShardRequestManagerInterface {
}
storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const override {
return *shards_map_.GetEdgeTypeId(name);
return shards_map_.GetEdgeTypeId(name).value();
}
storage::v3::PropertyId NameToProperty(const std::string &name) const override {
return *shards_map_.GetPropertyId(name);
return shards_map_.GetPropertyId(name).value();
}
storage::v3::LabelId LabelNameToLabelId(const std::string &name) const override {
return shards_map_.GetLabelId(name);
storage::v3::LabelId NameToLabel(const std::string &name) const override {
return shards_map_.GetLabelId(name).value();
}
const std::string &PropertyToName(memgraph::storage::v3::PropertyId /*prop*/) const override {
@ -317,13 +319,13 @@ class ShardRequestManager : public ShardRequestManagerInterface {
return responses;
}
std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) override {
std::vector<ExpandOneResultRow> Request(ExecutionState<ExpandOneRequest> &state, ExpandOneRequest request) override {
// TODO(kostasrim)Update to limit the batch size here
// Expansions of the destination must be handled by the caller. For example
// match (u:L1 { prop : 1 })-[:Friend]-(v:L1)
// For each vertex U, the ExpandOne will result in <U, Edges>. The destination vertex and its properties
// must be fetched again with an ExpandOne(Edges.dst)
MaybeInitializeExecutionState(state);
MaybeInitializeExecutionState(state, std::move(request));
std::vector<ExpandOneResponse> responses;
auto &shard_cache_ref = state.shard_cache;
@ -334,9 +336,18 @@ class ShardRequestManager : public ShardRequestManagerInterface {
do {
AwaitOnResponses(state, responses);
} while (!state.shard_cache.empty());
std::vector<ExpandOneResultRow> result_rows;
const auto total_row_count = std::accumulate(
responses.begin(), responses.end(), 0,
[](const int64_t partial_count, const ExpandOneResponse &resp) { return partial_count + resp.result.size(); });
result_rows.reserve(total_row_count);
for (auto &response : responses) {
result_rows.insert(result_rows.end(), std::make_move_iterator(response.result.begin()),
std::make_move_iterator(response.result.end()));
}
MaybeCompleteState(state);
return responses;
return result_rows;
}
private:
@ -455,7 +466,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
state.state = ExecutionState<ScanVerticesRequest>::EXECUTING;
}
void MaybeInitializeExecutionState(ExecutionState<ExpandOneRequest> &state) {
void MaybeInitializeExecutionState(ExecutionState<ExpandOneRequest> &state, ExpandOneRequest request) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
@ -463,24 +474,18 @@ class ShardRequestManager : public ShardRequestManagerInterface {
state.transaction_id = transaction_id_;
std::map<Shard, ExpandOneRequest> per_shard_request_table;
MG_ASSERT(state.requests.size() == 1);
auto top_level_rqst = std::move(*state.requests.begin());
auto top_level_rqst_template = top_level_rqst;
auto top_level_rqst_template = request;
top_level_rqst_template.transaction_id = transaction_id_;
top_level_rqst_template.src_vertices.clear();
top_level_rqst_template.edge_types.clear();
state.requests.clear();
size_t id = 0;
for (const auto &vertex : top_level_rqst.src_vertices) {
for (auto &vertex : request.src_vertices) {
auto shard =
shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second));
if (!per_shard_request_table.contains(shard)) {
ExpandOneRequest expand_v_rqst = top_level_rqst_template;
per_shard_request_table.insert(std::pair(shard, std::move(expand_v_rqst)));
per_shard_request_table.insert(std::pair(shard, top_level_rqst_template));
state.shard_cache.push_back(shard);
}
per_shard_request_table[shard].src_vertices.push_back(vertex);
per_shard_request_table[shard].edge_types.push_back(top_level_rqst.edge_types[id]);
++id;
}
for (auto &[shard, rqst] : per_shard_request_table) {
@ -545,10 +550,11 @@ class ShardRequestManager : public ShardRequestManagerInterface {
void SendAllRequests(ExecutionState<ExpandOneRequest> &state,
std::vector<memgraph::coordinator::Shard> &shard_cache_ref) {
size_t id = 0;
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++shard_it) {
auto &storage_client = GetStorageClientForShard(*shard_it);
ReadRequests req = state.requests[id];
storage_client.SendAsyncReadRequest(req);
++id;
}
}
@ -593,13 +599,13 @@ class ShardRequestManager : public ShardRequestManagerInterface {
auto &shard_cache_ref = state.shard_cache;
int64_t request_idx = 0;
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++request_idx) {
auto &storage_client = GetStorageClientForShard(
*state.label,
storage::conversions::ConvertPropertyVector(state.requests[request_idx].src_vertices[0].second));
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end();) {
auto &storage_client = GetStorageClientForShard(*shard_it);
auto poll_result = storage_client.PollAsyncReadRequest();
if (!poll_result) {
++shard_it;
++request_idx;
continue;
}
@ -622,7 +628,6 @@ class ShardRequestManager : public ShardRequestManagerInterface {
// Needed to maintain the 1-1 mapping between the ShardCache and the requests.
auto it = state.requests.begin() + request_idx;
state.requests.erase(it);
--request_idx;
}
}

View File

@ -83,7 +83,7 @@ class DbAccessor final {
}
storage::v3::ResultSchema<VertexAccessor> InsertVertexAndValidate(
storage::v3::LabelId primary_label, const std::vector<storage::v3::LabelId> &labels,
const storage::v3::LabelId primary_label, const std::vector<storage::v3::LabelId> &labels,
const std::vector<std::pair<storage::v3::PropertyId, storage::v3::PropertyValue>> &properties) {
auto maybe_vertex_acc = accessor_->CreateVertexAndValidate(primary_label, labels, properties);
if (maybe_vertex_acc.HasError()) {

View File

@ -342,6 +342,8 @@ class Shard final {
LabelId PrimaryLabel() const;
[[nodiscard]] bool IsVertexBelongToShard(const VertexId &vertex_id) const;
/// @throw std::bad_alloc
bool CreateIndex(LabelId label, std::optional<uint64_t> desired_commit_timestamp = {});
@ -376,8 +378,6 @@ class Shard final {
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
[[nodiscard]] bool IsVertexBelongToShard(const VertexId &vertex_id) const;
// Main object storage
NameIdMapper name_id_mapper_;
LabelId primary_label_;

View File

@ -33,26 +33,26 @@
#include "storage/v3/storage.hpp"
#include "storage/v3/value_conversions.hpp"
#include "storage/v3/vertex_accessor.hpp"
#include "storage/v3/vertex_id.hpp"
#include "storage/v3/view.hpp"
using memgraph::msgs::Label;
using memgraph::msgs::PropertyId;
using memgraph::msgs::Value;
using memgraph::msgs::Vertex;
using memgraph::msgs::VertexId;
using memgraph::storage::conversions::ConvertPropertyVector;
using memgraph::storage::conversions::ConvertValueVector;
using memgraph::storage::conversions::FromPropertyValueToValue;
using memgraph::storage::conversions::ToPropertyValue;
using memgraph::storage::v3::View;
namespace memgraph::storage::v3 {
using msgs::Label;
using msgs::PropertyId;
using msgs::Value;
using conversions::ConvertPropertyVector;
using conversions::ConvertValueVector;
using conversions::FromPropertyValueToValue;
using conversions::ToMsgsVertexId;
using conversions::ToPropertyValue;
namespace {
std::vector<std::pair<memgraph::storage::v3::PropertyId, memgraph::storage::v3::PropertyValue>> ConvertPropertyMap(
namespace msgs = msgs;
std::vector<std::pair<PropertyId, PropertyValue>> ConvertPropertyMap(
std::vector<std::pair<PropertyId, Value>> &&properties) {
std::vector<std::pair<memgraph::storage::v3::PropertyId, memgraph::storage::v3::PropertyValue>> ret;
std::vector<std::pair<PropertyId, PropertyValue>> ret;
ret.reserve(properties.size());
std::transform(std::make_move_iterator(properties.begin()), std::make_move_iterator(properties.end()),
@ -63,9 +63,8 @@ std::vector<std::pair<memgraph::storage::v3::PropertyId, memgraph::storage::v3::
return ret;
}
std::vector<std::pair<memgraph::storage::v3::PropertyId, Value>> FromMap(
const std::map<PropertyId, Value> &properties) {
std::vector<std::pair<memgraph::storage::v3::PropertyId, Value>> ret;
std::vector<std::pair<PropertyId, Value>> FromMap(const std::map<PropertyId, Value> &properties) {
std::vector<std::pair<PropertyId, Value>> ret;
ret.reserve(properties.size());
std::transform(properties.begin(), properties.end(), std::back_inserter(ret),
@ -74,9 +73,9 @@ std::vector<std::pair<memgraph::storage::v3::PropertyId, Value>> FromMap(
return ret;
}
std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(
const memgraph::storage::v3::VertexAccessor &acc, const std::vector<memgraph::storage::v3::PropertyId> &props,
View view) {
std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc,
const std::vector<PropertyId> &props,
View view) {
std::map<PropertyId, Value> ret;
for (const auto &prop : props) {
@ -86,19 +85,14 @@ std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor
return std::nullopt;
}
auto &value = result.GetValue();
if (value.IsNull()) {
spdlog::debug("The specified property does not exist but it should");
return std::nullopt;
}
ret.emplace(std::make_pair(prop, FromPropertyValueToValue(value)));
ret.emplace(std::make_pair(prop, FromPropertyValueToValue(std::move(value))));
}
return ret;
}
std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(
const memgraph::storage::v3::VertexAccessor &acc, memgraph::storage::v3::View view,
const memgraph::storage::v3::Schemas::Schema *schema) {
std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view,
const Schemas::Schema *schema) {
std::map<PropertyId, Value> ret;
auto props = acc.Properties(view);
if (props.HasError()) {
@ -106,36 +100,40 @@ std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(
return std::nullopt;
}
const auto &properties = props.GetValue();
std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()), [](const auto &property) {
return std::make_pair(property.first, FromPropertyValueToValue(property.second));
});
auto &properties = props.GetValue();
std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()),
[](std::pair<const PropertyId, PropertyValue> &pair) {
return std::make_pair(pair.first, FromPropertyValueToValue(std::move(pair.second)));
});
properties.clear();
// TODO(antaljanosbenjamin): Once the VertexAccessor::Properties returns also the primary keys, we can get rid of this
// code.
auto maybe_pk = acc.PrimaryKey(view);
if (maybe_pk.HasError()) {
spdlog::debug("Encountered an error while trying to get vertex primary key.");
}
const auto pk = maybe_pk.GetValue();
auto &pk = maybe_pk.GetValue();
MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!");
for (size_t i{0}; i < schema->second.size(); ++i) {
ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(pk[i]));
ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i])));
}
return ret;
}
memgraph::msgs::Value ConstructValueVertex(const memgraph::storage::v3::VertexAccessor &acc, View view) {
msgs::Value ConstructValueVertex(const VertexAccessor &acc, View view) {
// Get the vertex id
auto prim_label = acc.PrimaryLabel(view).GetValue();
memgraph::msgs::Label value_label{.id = prim_label};
msgs::Label value_label{.id = prim_label};
auto prim_key = ConvertValueVector(acc.PrimaryKey(view).GetValue());
memgraph::msgs::VertexId vertex_id = std::make_pair(value_label, prim_key);
msgs::VertexId vertex_id = std::make_pair(value_label, prim_key);
// Get the labels
auto vertex_labels = acc.Labels(view).GetValue();
std::vector<memgraph::msgs::Label> value_labels;
std::vector<msgs::Label> value_labels;
value_labels.reserve(vertex_labels.size());
std::transform(vertex_labels.begin(), vertex_labels.end(), std::back_inserter(value_labels),
@ -144,37 +142,37 @@ memgraph::msgs::Value ConstructValueVertex(const memgraph::storage::v3::VertexAc
return Value({.id = vertex_id, .labels = value_labels});
}
Value ConstructValueEdge(const memgraph::storage::v3::EdgeAccessor &acc, View view) {
memgraph::msgs::EdgeType type = {.id = acc.EdgeType().AsUint()};
memgraph::msgs::EdgeId gid = {.gid = acc.Gid().AsUint()};
Value ConstructValueEdge(const EdgeAccessor &acc, View view) {
msgs::EdgeType type = {.id = acc.EdgeType()};
msgs::EdgeId gid = {.gid = acc.Gid().AsUint()};
Label src_prim_label = {.id = acc.FromVertex().primary_label};
memgraph::msgs::VertexId src_vertex =
std::make_pair(src_prim_label, ConvertValueVector(acc.FromVertex().primary_key));
msgs::VertexId src_vertex = std::make_pair(src_prim_label, ConvertValueVector(acc.FromVertex().primary_key));
Label dst_prim_label = {.id = acc.ToVertex().primary_label};
memgraph::msgs::VertexId dst_vertex = std::make_pair(dst_prim_label, ConvertValueVector(acc.ToVertex().primary_key));
msgs::VertexId dst_vertex = std::make_pair(dst_prim_label, ConvertValueVector(acc.ToVertex().primary_key));
std::optional<std::vector<std::pair<PropertyId, Value>>> properties_opt = {};
const auto &properties = acc.Properties(view);
auto properties = acc.Properties(view);
std::vector<std::pair<PropertyId, Value>> present_properties;
if (properties.HasValue()) {
const auto &props = properties.GetValue();
std::vector<std::pair<PropertyId, Value>> present_properties;
auto &props = properties.GetValue();
present_properties.reserve(props.size());
std::transform(props.begin(), props.end(), std::back_inserter(present_properties),
[](const auto &prop) { return std::make_pair(prop.first, FromPropertyValueToValue(prop.second)); });
properties_opt = std::move(present_properties);
[](std::pair<const PropertyId, PropertyValue> &prop) {
return std::make_pair(prop.first, FromPropertyValueToValue(std::move(prop.second)));
});
}
return Value({.src = src_vertex, .dst = dst_vertex, .properties = properties_opt, .id = gid, .type = type});
return Value(msgs::Edge{.src = std::move(src_vertex),
.dst = std::move(dst_vertex),
.properties = std::move(present_properties),
.id = gid,
.type = type});
}
Value FromTypedValueToValue(memgraph::storage::v3::TypedValue &&tv) {
using memgraph::storage::v3::TypedValue;
Value FromTypedValueToValue(TypedValue &&tv) {
switch (tv.type()) {
case TypedValue::Type::Bool:
return Value(tv.ValueBool());
@ -219,7 +217,7 @@ Value FromTypedValueToValue(memgraph::storage::v3::TypedValue &&tv) {
return Value{};
}
std::vector<Value> ConvertToValueVectorFromTypedValueVector(std::vector<memgraph::storage::v3::TypedValue> &&vec) {
std::vector<Value> ConvertToValueVectorFromTypedValueVector(std::vector<TypedValue> &&vec) {
std::vector<Value> ret;
ret.reserve(vec.size());
@ -238,9 +236,8 @@ std::vector<PropertyId> NamesToProperties(const std::vector<std::string> &proper
return properties;
}
std::vector<memgraph::storage::v3::LabelId> NamesToLabels(const std::vector<std::string> &label_names,
DbAccessor &dba) {
std::vector<memgraph::storage::v3::LabelId> labels;
std::vector<LabelId> NamesToLabels(const std::vector<std::string> &label_names, DbAccessor &dba) {
std::vector<LabelId> labels;
labels.reserve(label_names.size());
for (const auto &name : label_names) {
labels.push_back(dba.NameToLabel(name));
@ -249,8 +246,7 @@ std::vector<memgraph::storage::v3::LabelId> NamesToLabels(const std::vector<std:
}
template <class TExpression>
auto Eval(TExpression *expr, EvaluationContext &ctx, AstStorage &storage,
memgraph::storage::v3::ExpressionEvaluator &eval, DbAccessor &dba) {
auto Eval(TExpression *expr, EvaluationContext &ctx, AstStorage &storage, ExpressionEvaluator &eval, DbAccessor &dba) {
ctx.properties = NamesToProperties(storage.properties_, dba);
ctx.labels = NamesToLabels(storage.labels_, dba);
auto value = expr->Accept(eval);
@ -266,9 +262,9 @@ std::any ParseExpression(const std::string &expr, memgraph::expr::AstStorage &st
return visitor.visit(ast);
}
TypedValue ComputeExpression(DbAccessor &dba, const std::optional<memgraph::storage::v3::VertexAccessor> &v_acc,
const std::optional<memgraph::storage::v3::EdgeAccessor> &e_acc,
const std::string &expression, std::string_view node_name, std::string_view edge_name) {
TypedValue ComputeExpression(DbAccessor &dba, const std::optional<VertexAccessor> &v_acc,
const std::optional<EdgeAccessor> &e_acc, const std::string &expression,
std::string_view node_name, std::string_view edge_name) {
AstStorage storage;
Frame frame{1 + 1}; // 1 for the node_identifier, 1 for the edge_identifier
SymbolTable symbol_table;
@ -308,18 +304,18 @@ TypedValue ComputeExpression(DbAccessor &dba, const std::optional<memgraph::stor
return Eval(std::any_cast<Expression *>(expr), ctx, storage, eval, dba);
}
bool FilterOnVertex(DbAccessor &dba, const memgraph::storage::v3::VertexAccessor &v_acc,
const std::vector<std::string> &filters, const std::string_view node_name) {
bool FilterOnVertex(DbAccessor &dba, const VertexAccessor &v_acc, const std::vector<std::string> &filters,
const std::string_view node_name) {
return std::ranges::all_of(filters, [&node_name, &dba, &v_acc](const auto &filter_expr) {
auto res = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, node_name, "");
return res.IsBool() && res.ValueBool();
});
}
std::vector<memgraph::storage::v3::TypedValue> EvaluateVertexExpressions(
DbAccessor &dba, const memgraph::storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &expressions,
std::string_view node_name) {
std::vector<memgraph::storage::v3::TypedValue> evaluated_expressions;
std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc,
const std::vector<std::string> &expressions,
std::string_view node_name) {
std::vector<TypedValue> evaluated_expressions;
evaluated_expressions.reserve(expressions.size());
std::transform(expressions.begin(), expressions.end(), std::back_inserter(evaluated_expressions),
@ -330,24 +326,21 @@ std::vector<memgraph::storage::v3::TypedValue> EvaluateVertexExpressions(
return evaluated_expressions;
}
bool DoesEdgeTypeMatch(const memgraph::msgs::ExpandOneRequest &req, const memgraph::storage::v3::EdgeAccessor &edge) {
bool DoesEdgeTypeMatch(const std::vector<msgs::EdgeType> &edge_types, const EdgeAccessor &edge) {
// TODO(gvolfing) This should be checked only once and handled accordingly.
if (req.edge_types.empty()) {
if (edge_types.empty()) {
return true;
}
return std::ranges::any_of(req.edge_types.cbegin(), req.edge_types.cend(),
[&edge](const memgraph::msgs::EdgeType &edge_type) {
return memgraph::storage::v3::EdgeTypeId::FromUint(edge_type.id) == edge.EdgeType();
});
return std::ranges::any_of(edge_types.begin(), edge_types.end(),
[&edge](const msgs::EdgeType &edge_type) { return edge_type.id == edge.EdgeType(); });
}
struct LocalError {};
std::optional<memgraph::msgs::Vertex> FillUpSourceVertex(
const std::optional<memgraph::storage::v3::VertexAccessor> &v_acc, memgraph::msgs::ExpandOneRequest &req,
memgraph::msgs::VertexId src_vertex) {
auto secondary_labels = v_acc->Labels(View::OLD);
std::optional<msgs::Vertex> FillUpSourceVertex(const std::optional<VertexAccessor> &v_acc,
const msgs::ExpandOneRequest &req, msgs::VertexId src_vertex) {
auto secondary_labels = v_acc->Labels(View::NEW);
if (secondary_labels.HasError()) {
spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}",
req.transaction_id.logical_id);
@ -355,22 +348,22 @@ std::optional<memgraph::msgs::Vertex> FillUpSourceVertex(
}
auto &sec_labels = secondary_labels.GetValue();
memgraph::msgs::Vertex source_vertex;
msgs::Vertex source_vertex;
source_vertex.id = src_vertex;
source_vertex.labels.reserve(sec_labels.size());
std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(source_vertex.labels),
[](auto label_id) { return memgraph::msgs::Label{.id = label_id}; });
[](auto label_id) { return msgs::Label{.id = label_id}; });
return source_vertex;
}
std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(
const std::optional<memgraph::storage::v3::VertexAccessor> &v_acc, memgraph::msgs::ExpandOneRequest &req) {
std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc,
const msgs::ExpandOneRequest &req) {
std::map<PropertyId, Value> src_vertex_properties;
if (!req.src_vertex_properties) {
auto props = v_acc->Properties(View::OLD);
auto props = v_acc->Properties(View::NEW);
if (props.HasError()) {
spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}",
req.transaction_id.logical_id);
@ -378,31 +371,34 @@ std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(
}
for (auto &[key, val] : props.GetValue()) {
src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(val)));
src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val))));
}
} else if (req.src_vertex_properties.value().empty()) {
// NOOP
} else {
auto &vertex_props = req.src_vertex_properties.value();
std::transform(vertex_props.begin(), vertex_props.end(),
std::inserter(src_vertex_properties, src_vertex_properties.begin()), [&v_acc](const auto &prop) {
const auto &prop_val = v_acc->GetProperty(prop, View::OLD);
return std::make_pair(prop, FromPropertyValueToValue(prop_val.GetValue()));
});
for (const auto &prop : req.src_vertex_properties.value()) {
auto prop_val = v_acc->GetProperty(prop, View::OLD);
if (prop_val.HasError()) {
spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}",
req.transaction_id.logical_id);
return std::nullopt;
}
src_vertex_properties.insert(std::make_pair(prop, FromPropertyValueToValue(std::move(prop_val.GetValue()))));
}
}
return src_vertex_properties;
}
std::optional<std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>> FillUpConnectingEdges(
const std::optional<memgraph::storage::v3::VertexAccessor> &v_acc, memgraph::msgs::ExpandOneRequest &req) {
std::vector<memgraph::storage::v3::EdgeAccessor> in_edges;
std::vector<memgraph::storage::v3::EdgeAccessor> out_edges;
std::optional<std::array<std::vector<EdgeAccessor>, 2>> FillUpConnectingEdges(
const std::optional<VertexAccessor> &v_acc, const msgs::ExpandOneRequest &req) {
std::vector<EdgeAccessor> in_edges;
std::vector<EdgeAccessor> out_edges;
switch (req.direction) {
case memgraph::msgs::EdgeDirection::OUT: {
auto out_edges_result = v_acc->OutEdges(View::OLD);
case msgs::EdgeDirection::OUT: {
auto out_edges_result = v_acc->OutEdges(View::NEW);
if (out_edges_result.HasError()) {
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
req.transaction_id.logical_id);
@ -411,8 +407,8 @@ std::optional<std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>> F
out_edges = std::move(out_edges_result.GetValue());
break;
}
case memgraph::msgs::EdgeDirection::IN: {
auto in_edges_result = v_acc->InEdges(View::OLD);
case msgs::EdgeDirection::IN: {
auto in_edges_result = v_acc->InEdges(View::NEW);
if (in_edges_result.HasError()) {
spdlog::debug(
"Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}"[req.transaction_id
@ -422,8 +418,8 @@ std::optional<std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>> F
in_edges = std::move(in_edges_result.GetValue());
break;
}
case memgraph::msgs::EdgeDirection::BOTH: {
auto in_edges_result = v_acc->InEdges(View::OLD);
case msgs::EdgeDirection::BOTH: {
auto in_edges_result = v_acc->InEdges(View::NEW);
if (in_edges_result.HasError()) {
spdlog::debug("Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}",
req.transaction_id.logical_id);
@ -431,7 +427,7 @@ std::optional<std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>> F
}
in_edges = std::move(in_edges_result.GetValue());
auto out_edges_result = v_acc->OutEdges(View::OLD);
auto out_edges_result = v_acc->OutEdges(View::NEW);
if (out_edges_result.HasError()) {
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
req.transaction_id.logical_id);
@ -441,115 +437,93 @@ std::optional<std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>> F
break;
}
}
return std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>{in_edges, out_edges};
return std::array<std::vector<EdgeAccessor>, 2>{in_edges, out_edges};
}
using AllEdgePropertyDataSructure = std::map<PropertyId, memgraph::msgs::Value>;
using SpecificEdgePropertyDataSructure = std::vector<memgraph::msgs::Value>;
using AllEdgePropertyDataSructure = std::map<PropertyId, msgs::Value>;
using SpecificEdgePropertyDataSructure = std::vector<msgs::Value>;
using AllEdgeProperties = std::tuple<memgraph::msgs::VertexId, memgraph::msgs::Gid, AllEdgePropertyDataSructure>;
using SpecificEdgeProperties =
std::tuple<memgraph::msgs::VertexId, memgraph::msgs::Gid, SpecificEdgePropertyDataSructure>;
using AllEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, AllEdgePropertyDataSructure>;
using SpecificEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, SpecificEdgePropertyDataSructure>;
using SpecificEdgePropertiesVector = std::vector<SpecificEdgeProperties>;
using AllEdgePropertiesVector = std::vector<AllEdgeProperties>;
template <typename ReturnType, typename EdgeProperties, typename EdgePropertyDataStructure, typename Functor>
std::optional<ReturnType> GetEdgesWithProperties(const std::vector<memgraph::storage::v3::EdgeAccessor> &edges,
const memgraph::msgs::ExpandOneRequest &req,
Functor get_edge_properties) {
ReturnType ret;
ret.reserve(edges.size());
using EdgeFiller = std::function<bool(const EdgeAccessor &edge, bool is_in_edge, msgs::ExpandOneResultRow &result_row)>;
template <bool are_in_edges>
bool FillEdges(const std::vector<EdgeAccessor> &edges, const msgs::ExpandOneRequest &req, msgs::ExpandOneResultRow &row,
const EdgeFiller &edge_filler) {
for (const auto &edge : edges) {
if (!DoesEdgeTypeMatch(req, edge)) {
if (!DoesEdgeTypeMatch(req.edge_types, edge)) {
continue;
}
EdgeProperties ret_tuple;
memgraph::msgs::Label label;
label.id = edge.FromVertex().primary_label;
memgraph::msgs::VertexId other_vertex = std::make_pair(label, ConvertValueVector(edge.FromVertex().primary_key));
const auto edge_props_var = get_edge_properties(edge);
if (std::get_if<LocalError>(&edge_props_var) != nullptr) {
return std::nullopt;
}
auto edge_props = std::get<EdgePropertyDataStructure>(edge_props_var);
memgraph::msgs::Gid gid = edge.Gid().AsUint();
ret.emplace_back(EdgeProperties{other_vertex, gid, edge_props});
}
return ret;
}
template <typename TPropertyValue, typename TPropertyNullopt>
void SetFinalEdgeProperties(std::optional<TPropertyValue> &properties_to_value,
std::optional<TPropertyNullopt> &properties_to_nullopt, const TPropertyValue &ret_out,
const TPropertyValue &ret_in, const memgraph::msgs::ExpandOneRequest &req) {
switch (req.direction) {
case memgraph::msgs::EdgeDirection::OUT: {
properties_to_value = std::move(ret_out);
break;
}
case memgraph::msgs::EdgeDirection::IN: {
properties_to_value = std::move(ret_in);
break;
}
case memgraph::msgs::EdgeDirection::BOTH: {
TPropertyValue ret;
ret.resize(ret_out.size() + ret_in.size());
ret.insert(ret.end(), std::make_move_iterator(ret_in.begin()), std::make_move_iterator(ret_in.end()));
ret.insert(ret.end(), std::make_move_iterator(ret_out.begin()), std::make_move_iterator(ret_out.end()));
properties_to_value = ret;
break;
if (!edge_filler(edge, are_in_edges, row)) {
return false;
}
}
properties_to_nullopt = {};
return true;
}
std::optional<memgraph::msgs::ExpandOneResultRow> GetExpandOneResult(memgraph::storage::v3::Shard::Accessor &acc,
memgraph::msgs::VertexId src_vertex,
memgraph::msgs::ExpandOneRequest req) {
using EdgeProperties =
std::variant<LocalError, std::map<PropertyId, memgraph::msgs::Value>, std::vector<memgraph::msgs::Value>>;
std::function<EdgeProperties(const memgraph::storage::v3::EdgeAccessor &)> get_edge_properties;
std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(Shard::Accessor &acc, msgs::VertexId src_vertex,
const msgs::ExpandOneRequest &req) {
EdgeFiller edge_filler;
if (!req.edge_properties) {
get_edge_properties = [&req](const memgraph::storage::v3::EdgeAccessor &edge) -> EdgeProperties {
std::map<PropertyId, memgraph::msgs::Value> ret;
auto property_results = edge.Properties(View::OLD);
if (property_results.HasError()) {
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
req.transaction_id.logical_id);
return LocalError{};
edge_filler = [transaction_id = req.transaction_id.logical_id](const EdgeAccessor &edge, const bool is_in_edge,
msgs::ExpandOneResultRow &result_row) -> bool {
auto properties_results = edge.Properties(View::NEW);
if (properties_results.HasError()) {
spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}", transaction_id);
return false;
}
for (const auto &[prop_key, prop_val] : property_results.GetValue()) {
ret.insert(std::make_pair(prop_key, FromPropertyValueToValue(prop_val)));
std::map<PropertyId, msgs::Value> value_properties;
for (auto &[prop_key, prop_val] : properties_results.GetValue()) {
value_properties.insert(std::make_pair(prop_key, FromPropertyValueToValue(std::move(prop_val))));
}
return ret;
using EdgeWithAllProperties = msgs::ExpandOneResultRow::EdgeWithAllProperties;
EdgeWithAllProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()},
edge.Gid().AsUint(), std::move(value_properties)};
if (is_in_edge) {
result_row.in_edges_with_all_properties.push_back(std::move(edges));
} else {
result_row.out_edges_with_all_properties.push_back(std::move(edges));
}
return true;
};
} else {
// TODO(gvolfing) - do we want to set the action_successful here?
get_edge_properties = [&req](const memgraph::storage::v3::EdgeAccessor &edge) {
std::vector<memgraph::msgs::Value> ret;
ret.reserve(req.edge_properties.value().size());
edge_filler = [&req](const EdgeAccessor &edge, const bool is_in_edge,
msgs::ExpandOneResultRow &result_row) -> bool {
std::vector<msgs::Value> value_properties;
value_properties.reserve(req.edge_properties.value().size());
for (const auto &edge_prop : req.edge_properties.value()) {
// TODO(gvolfing) maybe check for the absence of certain properties
ret.emplace_back(FromPropertyValueToValue(edge.GetProperty(edge_prop, View::OLD).GetValue()));
auto property_result = edge.GetProperty(edge_prop, View::NEW);
if (property_result.HasError()) {
spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}",
req.transaction_id.logical_id);
return false;
}
value_properties.emplace_back(FromPropertyValueToValue(std::move(property_result.GetValue())));
}
return ret;
using EdgeWithSpecificProperties = msgs::ExpandOneResultRow::EdgeWithSpecificProperties;
EdgeWithSpecificProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()},
edge.Gid().AsUint(), std::move(value_properties)};
if (is_in_edge) {
result_row.in_edges_with_specific_properties.push_back(std::move(edges));
} else {
result_row.out_edges_with_specific_properties.push_back(std::move(edges));
}
return true;
};
}
/// Fill up source vertex
auto v_acc = acc.FindVertex(ConvertPropertyVector(std::move(src_vertex.second)), View::OLD);
const auto primary_key = ConvertPropertyVector(std::move(src_vertex.second));
auto v_acc = acc.FindVertex(primary_key, View::NEW);
auto source_vertex = FillUpSourceVertex(v_acc, req, src_vertex);
if (!source_vertex) {
@ -570,55 +544,19 @@ std::optional<memgraph::msgs::ExpandOneResultRow> GetExpandOneResult(memgraph::s
auto [in_edges, out_edges] = fill_up_connecting_edges.value();
/// Assemble the edge properties
std::optional<AllEdgePropertiesVector> edges_with_all_properties;
std::optional<SpecificEdgePropertiesVector> edges_with_specific_properties;
if (!req.edge_properties) {
auto ret_in_opt = GetEdgesWithProperties<AllEdgePropertiesVector, AllEdgeProperties, AllEdgePropertyDataSructure>(
in_edges, req, get_edge_properties);
if (!ret_in_opt) {
return std::nullopt;
}
auto ret_out_opt = GetEdgesWithProperties<AllEdgePropertiesVector, AllEdgeProperties, AllEdgePropertyDataSructure>(
out_edges, req, get_edge_properties);
if (!ret_out_opt) {
return std::nullopt;
}
auto &ret_in = *ret_in_opt;
auto &ret_out = *ret_out_opt;
SetFinalEdgeProperties<AllEdgePropertiesVector, SpecificEdgePropertiesVector>(
edges_with_all_properties, edges_with_specific_properties, ret_out, ret_in, req);
} else {
auto ret_in_opt =
GetEdgesWithProperties<SpecificEdgePropertiesVector, SpecificEdgeProperties, SpecificEdgePropertyDataSructure>(
in_edges, req, get_edge_properties);
if (!ret_in_opt) {
return std::nullopt;
}
auto ret_out_opt =
GetEdgesWithProperties<SpecificEdgePropertiesVector, SpecificEdgeProperties, SpecificEdgePropertyDataSructure>(
out_edges, req, get_edge_properties);
if (!ret_out_opt) {
return std::nullopt;
}
auto &ret_in = *ret_in_opt;
auto &ret_out = *ret_out_opt;
SetFinalEdgeProperties<SpecificEdgePropertiesVector, AllEdgePropertiesVector>(
edges_with_specific_properties, edges_with_all_properties, ret_out, ret_in, req);
msgs::ExpandOneResultRow result_row;
result_row.src_vertex = std::move(*source_vertex);
result_row.src_vertex_properties = std::move(*src_vertex_properties);
static constexpr bool kInEdges = true;
static constexpr bool kOutEdges = false;
if (!in_edges.empty() && !FillEdges<kInEdges>(in_edges, req, result_row, edge_filler)) {
return std::nullopt;
}
if (!out_edges.empty() && !FillEdges<kOutEdges>(out_edges, req, result_row, edge_filler)) {
return std::nullopt;
}
return memgraph::msgs::ExpandOneResultRow{
.src_vertex = std::move(*source_vertex),
.src_vertex_properties = std::move(src_vertex_properties),
.edges_with_all_properties = std::move(edges_with_all_properties),
.edges_with_specific_properties = std::move(edges_with_specific_properties)};
return result_row;
}
}; // namespace
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) {
@ -637,7 +575,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) {
auto converted_property_map = ConvertPropertyMap(std::move(new_vertex.properties));
// TODO(gvolfing) make sure if this conversion is actually needed.
std::vector<memgraph::storage::v3::LabelId> converted_label_ids;
std::vector<LabelId> converted_label_ids;
converted_label_ids.reserve(new_vertex.label_ids.size());
std::transform(new_vertex.label_ids.begin(), new_vertex.label_ids.end(), std::back_inserter(converted_label_ids),
@ -671,7 +609,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) {
}
}
return memgraph::msgs::CreateVerticesResponse{.success = action_successful};
return msgs::CreateVerticesResponse{.success = action_successful};
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateVerticesRequest &&req) {
@ -720,7 +658,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateVerticesRequest &&req) {
}
}
return memgraph::msgs::UpdateVerticesResponse{.success = action_successful};
return msgs::UpdateVerticesResponse{.success = action_successful};
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteVerticesRequest &&req) {
@ -743,7 +681,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteVerticesRequest &&req) {
// Since we will not have different kinds of deletion types in one transaction,
// we dont have to enter the switch statement on every iteration. Optimize this.
switch (req.deletion_type) {
case memgraph::msgs::DeleteVerticesRequest::DeletionType::DELETE: {
case msgs::DeleteVerticesRequest::DeletionType::DELETE: {
auto result = acc.DeleteVertex(&vertex_acc.value());
if (result.HasError() || !(result.GetValue().has_value())) {
action_successful = false;
@ -752,7 +690,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteVerticesRequest &&req) {
break;
}
case memgraph::msgs::DeleteVerticesRequest::DeletionType::DETACH_DELETE: {
case msgs::DeleteVerticesRequest::DeletionType::DETACH_DELETE: {
auto result = acc.DetachDeleteVertex(&vertex_acc.value());
if (result.HasError() || !(result.GetValue().has_value())) {
action_successful = false;
@ -766,7 +704,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteVerticesRequest &&req) {
}
}
return memgraph::msgs::DeleteVerticesResponse{.success = action_successful};
return msgs::DeleteVerticesResponse{.success = action_successful};
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateExpandRequest &&req) {
@ -774,25 +712,20 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateExpandRequest &&req) {
bool action_successful = true;
for (auto &new_expand : req.new_expands) {
auto vertex_acc_from_primary_key = new_expand.src_vertex.second;
auto vertex_from_acc = acc.FindVertex(ConvertPropertyVector(std::move(vertex_acc_from_primary_key)), View::OLD);
const auto from_vertex_id =
v3::VertexId{new_expand.src_vertex.first.id, ConvertPropertyVector(std::move(new_expand.src_vertex.second))};
auto vertex_acc_to_primary_key = new_expand.dest_vertex.second;
auto vertex_to_acc = acc.FindVertex(ConvertPropertyVector(std::move(vertex_acc_to_primary_key)), View::OLD);
const auto to_vertex_id =
VertexId{new_expand.dest_vertex.first.id, ConvertPropertyVector(std::move(new_expand.dest_vertex.second))};
if (!(vertex_from_acc || vertex_to_acc)) {
if (!(shard_->IsVertexBelongToShard(from_vertex_id) || shard_->IsVertexBelongToShard(to_vertex_id))) {
action_successful = false;
spdlog::debug("Error while trying to insert edge, vertex does not exist. Transaction id: {}",
spdlog::debug("Error while trying to insert edge, none of the vertices belong to this shard. Transaction id: {}",
req.transaction_id.logical_id);
break;
}
auto from_vertex_id =
VertexId(new_expand.src_vertex.first.id, ConvertPropertyVector(std::move(new_expand.src_vertex.second)));
auto to_vertex_id =
VertexId(new_expand.dest_vertex.first.id, ConvertPropertyVector(std::move(new_expand.dest_vertex.second)));
auto edge_acc = acc.CreateEdge(from_vertex_id, to_vertex_id, EdgeTypeId::FromUint(new_expand.type.id),
Gid::FromUint(new_expand.id.gid));
auto edge_acc = acc.CreateEdge(from_vertex_id, to_vertex_id, new_expand.type.id, Gid::FromUint(new_expand.id.gid));
if (edge_acc.HasValue()) {
auto edge = edge_acc.GetValue();
if (!new_expand.properties.empty()) {
@ -828,7 +761,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateExpandRequest &&req) {
}
}
return memgraph::msgs::CreateExpandResponse{.success = action_successful};
return msgs::CreateExpandResponse{.success = action_successful};
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteEdgesRequest &&req) {
@ -850,10 +783,11 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteEdgesRequest &&req) {
}
}
return memgraph::msgs::DeleteEdgesResponse{.success = action_successful};
return msgs::DeleteEdgesResponse{.success = action_successful};
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateEdgesRequest &&req) {
// TODO(antaljanosbenjamin): handle when the vertex is the destination vertex
auto acc = shard_->Access(req.transaction_id);
bool action_successful = true;
@ -908,15 +842,15 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateEdgesRequest &&req) {
}
}
return memgraph::msgs::UpdateEdgesResponse{.success = action_successful};
return msgs::UpdateEdgesResponse{.success = action_successful};
}
msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
auto acc = shard_->Access(req.transaction_id);
bool action_successful = true;
std::vector<memgraph::msgs::ScanResultRow> results;
std::optional<memgraph::msgs::VertexId> next_start_id;
std::vector<msgs::ScanResultRow> results;
std::optional<msgs::VertexId> next_start_id;
const auto view = View(req.storage_view);
auto vertex_iterable = acc.Vertices(view);
@ -988,7 +922,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
}
}
memgraph::msgs::ScanVerticesResponse resp{};
msgs::ScanVerticesResponse resp{};
resp.success = action_successful;
if (action_successful) {
@ -1003,7 +937,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
auto acc = shard_->Access(req.transaction_id);
bool action_successful = true;
std::vector<memgraph::msgs::ExpandOneResultRow> results;
std::vector<msgs::ExpandOneResultRow> results;
for (auto &src_vertex : req.src_vertices) {
auto result = GetExpandOneResult(acc, src_vertex, req);
@ -1016,7 +950,8 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
results.emplace_back(result.value());
}
memgraph::msgs::ExpandOneResponse resp{};
msgs::ExpandOneResponse resp{};
resp.success = action_successful;
if (action_successful) {
resp.result = std::move(results);
}
@ -1026,12 +961,12 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CommitRequest &&req) {
shard_->Access(req.transaction_id).Commit(req.commit_timestamp);
return memgraph::msgs::CommitResponse{true};
return msgs::CommitResponse{true};
};
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest && /*req*/) {
return memgraph::msgs::GetPropertiesResponse{};
return msgs::GetPropertiesResponse{};
}
} // namespace memgraph::storage::v3

View File

@ -10,6 +10,7 @@
// licenses/APL.txt.
#include "query/v2/requests.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/vertex_id.hpp"
#include "utils/logging.hpp"
#include <map>
@ -29,8 +30,8 @@ using memgraph::msgs::Value;
using memgraph::msgs::VertexId;
// TODO(gvolfing use come algorithm instead of explicit for loops)
inline memgraph::storage::v3::PropertyValue ToPropertyValue(Value value) {
using PV = memgraph::storage::v3::PropertyValue;
inline v3::PropertyValue ToPropertyValue(Value value) {
using PV = v3::PropertyValue;
PV ret;
switch (value.type) {
case Value::Type::Null:
@ -65,7 +66,7 @@ inline memgraph::storage::v3::PropertyValue ToPropertyValue(Value value) {
return ret;
}
inline Value FromPropertyValueToValue(const memgraph::storage::v3::PropertyValue &pv) {
inline Value FromPropertyValueToValue(memgraph::storage::v3::PropertyValue &&pv) {
using memgraph::storage::v3::PropertyValue;
switch (pv.type()) {
@ -78,17 +79,17 @@ inline Value FromPropertyValueToValue(const memgraph::storage::v3::PropertyValue
case PropertyValue::Type::List: {
std::vector<Value> list;
list.reserve(pv.ValueList().size());
for (const auto &elem : pv.ValueList()) {
list.emplace_back(FromPropertyValueToValue(elem));
for (auto &elem : pv.ValueList()) {
list.emplace_back(FromPropertyValueToValue(std::move(elem)));
}
return Value(list);
}
case PropertyValue::Type::Map: {
std::map<std::string, Value> map;
for (const auto &[key, val] : pv.ValueMap()) {
for (auto &[key, val] : pv.ValueMap()) {
// maybe use std::make_pair once the && issue is resolved.
map.emplace(key, FromPropertyValueToValue(val));
map.emplace(key, FromPropertyValueToValue(std::move(val)));
}
return Value(map);
@ -96,7 +97,7 @@ inline Value FromPropertyValueToValue(const memgraph::storage::v3::PropertyValue
case PropertyValue::Type::Null:
return Value{};
case PropertyValue::Type::String:
return Value(pv.ValueString());
return Value(std::move(pv.ValueString()));
case PropertyValue::Type::TemporalData: {
// TBD -> we need to specify this in the messages, not a priority.
MG_ASSERT(false, "Temporal datatypes are not yet implemented on Value!");
@ -105,8 +106,8 @@ inline Value FromPropertyValueToValue(const memgraph::storage::v3::PropertyValue
}
}
inline std::vector<memgraph::storage::v3::PropertyValue> ConvertPropertyVector(std::vector<Value> vec) {
std::vector<memgraph::storage::v3::PropertyValue> ret;
inline std::vector<v3::PropertyValue> ConvertPropertyVector(std::vector<Value> vec) {
std::vector<v3::PropertyValue> ret;
ret.reserve(vec.size());
for (auto &elem : vec) {
@ -116,15 +117,18 @@ inline std::vector<memgraph::storage::v3::PropertyValue> ConvertPropertyVector(s
return ret;
}
inline std::vector<Value> ConvertValueVector(const std::vector<memgraph::storage::v3::PropertyValue> &vec) {
inline std::vector<Value> ConvertValueVector(const std::vector<v3::PropertyValue> &vec) {
std::vector<Value> ret;
ret.reserve(vec.size());
for (const auto &elem : vec) {
ret.push_back(FromPropertyValueToValue(elem));
ret.push_back(FromPropertyValueToValue(v3::PropertyValue{elem}));
}
return ret;
}
inline msgs::VertexId ToMsgsVertexId(const v3::VertexId &vertex_id) {
return {msgs::Label{vertex_id.primary_label}, ConvertValueVector(vertex_id.primary_key)};
}
} // namespace memgraph::storage::conversions

View File

@ -453,6 +453,7 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::Properties(View view
Delta *delta = nullptr;
{
deleted = vertex_->deleted;
// TODO(antaljanosbenjamin): This should also return the primary key
properties = vertex_->properties.Properties();
delta = vertex_->delta;
}

View File

@ -48,15 +48,28 @@ def test_vertex_creation_and_scanall(connection):
wait_for_shard_manager_to_initialize()
cursor = connection.cursor()
assert has_n_result_row(cursor, "CREATE (n :label {property:1, asd:2})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:2, asd:2})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:3, asd:2})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:4, asd:2})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:5, asd:2})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0)
assert has_n_result_row(cursor, "CREATE (n :label {property:3})", 0)
assert has_n_result_row(cursor, "MATCH (n) RETURN n", 5)
assert has_n_result_row(cursor, "MATCH (n) RETURN *", 5)
assert has_n_result_row(cursor, "MATCH (n :label) RETURN *", 5)
assert has_n_result_row(cursor, "MATCH (n) RETURN n", 3)
assert has_n_result_row(cursor, "MATCH (n) RETURN *", 3)
assert has_n_result_row(cursor, "MATCH (n :label) RETURN *", 3)
assert has_n_result_row(cursor, "MATCH (n), (m) CREATE (n)-[:TO]->(m)", 0)
results = execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n,r,m")
assert len(results) == 9
for (n, r, m) in results:
n_props = n.properties
assert len(n_props) == 0, "n is not expected to have properties, update the test!"
assert len(n.labels) == 0, "n is not expected to have labels, update the test!"
assert r.type == "TO"
m_props = m.properties
assert m_props["property"] <= 3 and m_props["property"] >= 0, "Wrong key"
assert len(m.labels) == 0, "m is not expected to have labels, update the test!"
if __name__ == "__main__":

View File

@ -43,26 +43,18 @@
#include "storage/v3/value_conversions.hpp"
#include "utils/logging.hpp"
using memgraph::coordinator::Hlc;
using memgraph::io::rsm::StorageWriteRequest;
using memgraph::io::rsm::StorageWriteResponse;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::msgs::CreateVerticesRequest;
using memgraph::msgs::CreateVerticesResponse;
using memgraph::msgs::ExpandOneRequest;
using memgraph::msgs::ExpandOneResponse;
using memgraph::msgs::ListedValues;
using memgraph::msgs::ScanVerticesRequest;
using memgraph::msgs::ScanVerticesResponse;
using memgraph::msgs::Value;
using memgraph::msgs::VertexId;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::PropertyValue;
namespace memgraph::storage::v3::tests {
using coordinator::Hlc;
using io::rsm::StorageWriteRequest;
using io::rsm::StorageWriteResponse;
using io::simulator::Simulator;
using io::simulator::SimulatorConfig;
using io::simulator::SimulatorStats;
using io::simulator::SimulatorTransport;
using storage::v3::LabelId;
using storage::v3::PropertyValue;
using ShardRsmKey = std::vector<memgraph::storage::v3::PropertyValue>;
using ShardRsmKey = std::vector<storage::v3::PropertyValue>;
class MockedShardRsm {
std::map<ShardRsmKey, int> state_;
@ -79,32 +71,37 @@ class MockedShardRsm {
}
public:
using ReadRequests = msgs::ReadRequests;
using ReadResponses = msgs::ReadResponses;
using WriteRequests = msgs::WriteRequests;
using WriteResponses = msgs::WriteResponses;
// ExpandOneResponse Read(ExpandOneRequest rqst);
// GetPropertiesResponse Read(GetPropertiesRequest rqst);
ScanVerticesResponse ReadImpl(ScanVerticesRequest rqst) {
ScanVerticesResponse ret;
auto as_prop_val = memgraph::storage::conversions::ConvertPropertyVector(rqst.start_id.second);
msgs::ScanVerticesResponse ReadImpl(msgs::ScanVerticesRequest rqst) {
msgs::ScanVerticesResponse ret;
auto as_prop_val = storage::conversions::ConvertPropertyVector(rqst.start_id.second);
if (!IsKeyInRange(as_prop_val)) {
ret.success = false;
} else if (as_prop_val == ShardRsmKey{PropertyValue(0), PropertyValue(0)}) {
Value val(int64_t(0));
ret.next_start_id = std::make_optional<VertexId>();
msgs::Value val(int64_t(0));
ret.next_start_id = std::make_optional<msgs::VertexId>();
ret.next_start_id->second =
memgraph::storage::conversions::ConvertValueVector(ShardRsmKey{PropertyValue(1), PropertyValue(0)});
memgraph::msgs::ScanResultRow result;
result.props.push_back(std::make_pair(memgraph::msgs::PropertyId::FromUint(0), val));
storage::conversions::ConvertValueVector(ShardRsmKey{PropertyValue(1), PropertyValue(0)});
msgs::ScanResultRow result;
result.props.push_back(std::make_pair(msgs::PropertyId::FromUint(0), val));
ret.results.push_back(std::move(result));
ret.success = true;
} else if (as_prop_val == ShardRsmKey{PropertyValue(1), PropertyValue(0)}) {
memgraph::msgs::ScanResultRow result;
Value val(int64_t(1));
result.props.push_back(std::make_pair(memgraph::msgs::PropertyId::FromUint(0), val));
msgs::ScanResultRow result;
msgs::Value val(int64_t(1));
result.props.push_back(std::make_pair(msgs::PropertyId::FromUint(0), val));
ret.results.push_back(std::move(result));
ret.success = true;
} else if (as_prop_val == ShardRsmKey{PropertyValue(12), PropertyValue(13)}) {
memgraph::msgs::ScanResultRow result;
Value val(int64_t(444));
result.props.push_back(std::make_pair(memgraph::msgs::PropertyId::FromUint(0), val));
msgs::ScanResultRow result;
msgs::Value val(int64_t(444));
result.props.push_back(std::make_pair(msgs::PropertyId::FromUint(0), val));
ret.results.push_back(std::move(result));
ret.success = true;
} else {
@ -113,14 +110,25 @@ class MockedShardRsm {
return ret;
}
ExpandOneResponse ReadImpl(ExpandOneRequest rqst) { return {}; }
using ReadRequests = std::variant<ScanVerticesRequest, ExpandOneRequest>;
using ReadResponses = std::variant<ScanVerticesResponse, ExpandOneResponse>;
msgs::ExpandOneResponse ReadImpl(msgs::ExpandOneRequest rqst) { return {}; }
msgs::ExpandOneResponse ReadImpl(msgs::GetPropertiesRequest rqst) { return {}; }
ReadResponses Read(ReadRequests read_requests) {
return {std::visit([this](auto &&request) { return ReadResponses{ReadImpl(std::move(request))}; },
return {std::visit([this]<typename T>(T &&request) { return ReadResponses{ReadImpl(std::forward<T>(request))}; },
std::move(read_requests))};
}
CreateVerticesResponse Apply(CreateVerticesRequest request) { return CreateVerticesResponse{.success = true}; }
msgs::CreateVerticesResponse ApplyImpl(msgs::CreateVerticesRequest rqst) { return {.success = true}; }
msgs::DeleteVerticesResponse ApplyImpl(msgs::DeleteVerticesRequest rqst) { return {}; }
msgs::UpdateVerticesResponse ApplyImpl(msgs::UpdateVerticesRequest rqst) { return {}; }
msgs::CreateExpandResponse ApplyImpl(msgs::CreateExpandRequest rqst) { return {.success = true}; }
msgs::DeleteEdgesResponse ApplyImpl(msgs::DeleteEdgesRequest rqst) { return {}; }
msgs::UpdateEdgesResponse ApplyImpl(msgs::UpdateEdgesRequest rqst) { return {}; }
msgs::CommitResponse ApplyImpl(msgs::CommitRequest rqst) { return {}; }
WriteResponses Apply(WriteRequests write_requests) {
return {std::visit([this]<typename T>(T &&request) { return WriteResponses{ApplyImpl(std::forward<T>(request))}; },
std::move(write_requests))};
}
};
} // namespace memgraph::storage::v3::tests

View File

@ -36,51 +36,51 @@
#include "storage/v3/property_value.hpp"
#include "utils/result.hpp"
using memgraph::coordinator::AddressAndStatus;
using CompoundKey = memgraph::coordinator::PrimaryKey;
using memgraph::coordinator::Coordinator;
using memgraph::coordinator::CoordinatorClient;
using memgraph::coordinator::CoordinatorRsm;
using memgraph::coordinator::HlcRequest;
using memgraph::coordinator::HlcResponse;
using memgraph::coordinator::Shard;
using memgraph::coordinator::ShardMap;
using memgraph::coordinator::Shards;
using memgraph::coordinator::Status;
using memgraph::io::Address;
using memgraph::io::Io;
using memgraph::io::ResponseEnvelope;
using memgraph::io::ResponseFuture;
using memgraph::io::Time;
using memgraph::io::TimedOut;
using memgraph::io::rsm::Raft;
using memgraph::io::rsm::ReadRequest;
using memgraph::io::rsm::ReadResponse;
using memgraph::io::rsm::StorageReadRequest;
using memgraph::io::rsm::StorageReadResponse;
using memgraph::io::rsm::StorageWriteRequest;
using memgraph::io::rsm::StorageWriteResponse;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::msgs::CreateVerticesRequest;
using memgraph::msgs::CreateVerticesResponse;
using memgraph::msgs::ListedValues;
using memgraph::msgs::NewVertexLabel;
using memgraph::msgs::ScanVerticesRequest;
using memgraph::msgs::ScanVerticesResponse;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::SchemaProperty;
using memgraph::utils::BasicResult;
namespace memgraph::query::v2::tests {
using coordinator::AddressAndStatus;
using CompoundKey = coordinator::PrimaryKey;
using coordinator::Coordinator;
using coordinator::CoordinatorClient;
using coordinator::CoordinatorRsm;
using coordinator::HlcRequest;
using coordinator::HlcResponse;
using coordinator::Shard;
using coordinator::ShardMap;
using coordinator::Shards;
using coordinator::Status;
using io::Address;
using io::Io;
using io::ResponseEnvelope;
using io::ResponseFuture;
using io::Time;
using io::TimedOut;
using io::rsm::Raft;
using io::rsm::ReadRequest;
using io::rsm::ReadResponse;
using io::rsm::StorageReadRequest;
using io::rsm::StorageReadResponse;
using io::rsm::StorageWriteRequest;
using io::rsm::StorageWriteResponse;
using io::rsm::WriteRequest;
using io::rsm::WriteResponse;
using io::simulator::Simulator;
using io::simulator::SimulatorConfig;
using io::simulator::SimulatorStats;
using io::simulator::SimulatorTransport;
using msgs::CreateVerticesRequest;
using msgs::CreateVerticesResponse;
using msgs::ScanVerticesRequest;
using msgs::ScanVerticesResponse;
using msgs::VertexId;
using storage::v3::LabelId;
using storage::v3::SchemaProperty;
using storage::v3::tests::MockedShardRsm;
using utils::BasicResult;
namespace {
ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::coordinator::Address a_io_2,
memgraph::coordinator::Address a_io_3, memgraph::coordinator::Address b_io_1,
memgraph::coordinator::Address b_io_2, memgraph::coordinator::Address b_io_3) {
ShardMap CreateDummyShardmap(coordinator::Address a_io_1, coordinator::Address a_io_2, coordinator::Address a_io_3,
coordinator::Address b_io_1, coordinator::Address b_io_2, coordinator::Address b_io_3) {
static const std::string label_name = std::string("test_label");
ShardMap sm;
@ -89,8 +89,8 @@ ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::co
const auto properties = sm.AllocatePropertyIds(property_names);
const auto property_id_1 = properties.at("property_1");
const auto property_id_2 = properties.at("property_2");
const auto type_1 = memgraph::common::SchemaType::INT;
const auto type_2 = memgraph::common::SchemaType::INT;
const auto type_1 = common::SchemaType::INT;
const auto type_2 = common::SchemaType::INT;
// register new label space
std::vector<SchemaProperty> schema = {
@ -113,8 +113,8 @@ ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::co
Shard shard1 = {aas1_1, aas1_2, aas1_3};
auto key1 = memgraph::storage::v3::PropertyValue(0);
auto key2 = memgraph::storage::v3::PropertyValue(0);
auto key1 = storage::v3::PropertyValue(0);
auto key2 = storage::v3::PropertyValue(0);
CompoundKey compound_key_1 = {key1, key2};
shards_for_label[compound_key_1] = shard1;
@ -125,20 +125,22 @@ ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::co
Shard shard2 = {aas2_1, aas2_2, aas2_3};
auto key3 = memgraph::storage::v3::PropertyValue(12);
auto key4 = memgraph::storage::v3::PropertyValue(13);
auto key3 = storage::v3::PropertyValue(12);
auto key4 = storage::v3::PropertyValue(13);
CompoundKey compound_key_2 = {key3, key4};
shards_for_label[compound_key_2] = shard2;
sm.AllocateEdgeTypeIds(std::vector<coordinator::EdgeTypeName>{"edge_type"});
return sm;
}
} // namespace
using WriteRequests = CreateVerticesRequest;
using WriteResponses = CreateVerticesResponse;
using ReadRequests = std::variant<ScanVerticesRequest, ExpandOneRequest>;
using ReadResponses = std::variant<ScanVerticesResponse, ExpandOneResponse>;
using WriteRequests = msgs::WriteRequests;
using WriteResponses = msgs::WriteResponses;
using ReadRequests = msgs::ReadRequests;
using ReadResponses = msgs::ReadResponses;
using ConcreteCoordinatorRsm = CoordinatorRsm<SimulatorTransport>;
using ConcreteStorageRsm =
@ -149,40 +151,34 @@ void RunStorageRaft(Raft<IoImpl, MockedShardRsm, WriteRequests, WriteResponses,
server.Run();
}
template <typename ShardRequestManager>
void TestScanAll(ShardRequestManager &io) {
memgraph::msgs::ExecutionState<ScanVerticesRequest> state{.label = "test_label"};
void TestScanVertices(msgs::ShardRequestManagerInterface &io) {
msgs::ExecutionState<ScanVerticesRequest> state{.label = "test_label"};
auto result = io.Request(state);
MG_ASSERT(result.size() == 2);
{
auto prop = result[0].GetProperty(memgraph::msgs::PropertyId::FromUint(0));
auto prop = result[0].GetProperty(msgs::PropertyId::FromUint(0));
MG_ASSERT(prop.int_v == 0);
prop = result[1].GetProperty(memgraph::msgs::PropertyId::FromUint(0));
prop = result[1].GetProperty(msgs::PropertyId::FromUint(0));
MG_ASSERT(prop.int_v == 444);
}
result = io.Request(state);
{
MG_ASSERT(result.size() == 1);
auto prop = result[0].GetProperty(memgraph::msgs::PropertyId::FromUint(0));
auto prop = result[0].GetProperty(msgs::PropertyId::FromUint(0));
MG_ASSERT(prop.int_v == 1);
}
// Exhaust it, request should be empty
result = io.Request(state);
MG_ASSERT(result.size() == 0);
}
template <typename ShardRequestManager>
void TestCreateVertices(ShardRequestManager &io) {
using PropVal = memgraph::msgs::Value;
memgraph::msgs::ExecutionState<CreateVerticesRequest> state;
std::vector<memgraph::msgs::NewVertex> new_vertices;
auto label_id = io.LabelNameToLabelId("test_label");
memgraph::msgs::NewVertex a1{.primary_key = {PropVal(int64_t(1)), PropVal(int64_t(0))}};
void TestCreateVertices(msgs::ShardRequestManagerInterface &io) {
using PropVal = msgs::Value;
msgs::ExecutionState<CreateVerticesRequest> state;
std::vector<msgs::NewVertex> new_vertices;
auto label_id = io.NameToLabel("test_label");
msgs::NewVertex a1{.primary_key = {PropVal(int64_t(1)), PropVal(int64_t(0))}};
a1.label_ids.push_back({label_id});
memgraph::msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}};
msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}};
a2.label_ids.push_back({label_id});
new_vertices.push_back(std::move(a1));
new_vertices.push_back(std::move(a2));
@ -191,151 +187,176 @@ void TestCreateVertices(ShardRequestManager &io) {
MG_ASSERT(result.size() == 2);
}
template <typename ShardRequestManager>
void TestCreateExpand(ShardRequestManager &io) {
using PropVal = memgraph::msgs::Value;
memgraph::msgs::ExecutionState<memgraph::msgs::CreateExpandRequest> state;
std::vector<memgraph::msgs::NewExpand> new_expands;
void TestCreateExpand(msgs::ShardRequestManagerInterface &io) {
using PropVal = msgs::Value;
msgs::ExecutionState<msgs::CreateExpandRequest> state;
std::vector<msgs::NewExpand> new_expands;
const auto edge_type_id = io.NameToEdgeType("edge_type");
const auto label_id = io.LabelNameToLabelId("test_label");
const VertexId vertex_id_1{label_id, {PropVal(int64_t(1)), PropVal(int64_t(0))}};
const VertexId vertex_id_2{label_id, {PropVal(int64_t(13)), PropVal(int64_t(13))}};
memgraph::msgs::NewExpand expand_1{
.id = 0, .type = edge_type_id, .src_vertex = vertex_id_1, .dest_vertex = vertex_id_2};
memgraph::msgs::NewExpand expand_2{
.id = 1, .type = edge_type_id, .src_vertex = vertex_id_2, .dest_vertex = vertex_id_1};
const auto label = msgs::Label{io.NameToLabel("test_label")};
const msgs::VertexId vertex_id_1{label, {PropVal(int64_t(0)), PropVal(int64_t(0))}};
const msgs::VertexId vertex_id_2{label, {PropVal(int64_t(13)), PropVal(int64_t(13))}};
msgs::NewExpand expand_1{
.id = {.gid = 0}, .type = {edge_type_id}, .src_vertex = vertex_id_1, .dest_vertex = vertex_id_2};
msgs::NewExpand expand_2{
.id = {.gid = 1}, .type = {edge_type_id}, .src_vertex = vertex_id_2, .dest_vertex = vertex_id_1};
new_expands.push_back(std::move(expand_1));
new_expands.push_back(std::move(expand_2));
auto result = io.Request(state, std::move(new_expands));
MG_ASSERT(result.size() == 2);
auto responses = io.Request(state, std::move(new_expands));
MG_ASSERT(responses.size() == 2);
MG_ASSERT(responses[0].success);
MG_ASSERT(responses[1].success);
}
template <typename ShardRequestManager>
void TestExpand(ShardRequestManager &io) {}
void TestExpandOne(msgs::ShardRequestManagerInterface &shard_request_manager) {
msgs::ExecutionState<msgs::ExpandOneRequest> state{};
msgs::ExpandOneRequest request;
const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type");
const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")};
request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}});
request.edge_types.push_back(msgs::EdgeType{edge_type_id});
request.direction = msgs::EdgeDirection::BOTH;
auto result_rows = shard_request_manager.Request(state, std::move(request));
MG_ASSERT(result_rows.size() == 2);
}
template <typename ShardRequestManager>
void TestAggregate(ShardRequestManager &io) {}
int main() {
// SimulatorConfig config{
// .drop_percent = 0,
// .perform_timeouts = false,
// .scramble_messages = false,
// .rng_seed = 0,
// .start_time = Time::min() + std::chrono::microseconds{256 * 1024},
// .abort_time = Time::min() + std::chrono::microseconds{2 * 8 * 1024 * 1024},
// };
void DoTest() {
SimulatorConfig config{
.drop_percent = 0,
.perform_timeouts = false,
.scramble_messages = false,
.rng_seed = 0,
.start_time = Time::min() + std::chrono::microseconds{256 * 1024},
.abort_time = Time::min() + std::chrono::microseconds{2 * 8 * 1024 * 1024},
};
// auto simulator = Simulator(config);
// const auto one_second = std::chrono::seconds(1);
auto simulator = Simulator(config);
const auto one_second = std::chrono::seconds(1);
// Io<SimulatorTransport> cli_io = simulator.RegisterNew();
// cli_io.SetDefaultTimeout(one_second);
Io<SimulatorTransport> cli_io = simulator.RegisterNew();
cli_io.SetDefaultTimeout(one_second);
// // Register
// Io<SimulatorTransport> a_io_1 = simulator.RegisterNew();
// a_io_1.SetDefaultTimeout(one_second);
// Io<SimulatorTransport> a_io_2 = simulator.RegisterNew();
// a_io_2.SetDefaultTimeout(one_second);
// Io<SimulatorTransport> a_io_3 = simulator.RegisterNew();
// a_io_3.SetDefaultTimeout(one_second);
// Register
Io<SimulatorTransport> a_io_1 = simulator.RegisterNew();
a_io_1.SetDefaultTimeout(one_second);
Io<SimulatorTransport> a_io_2 = simulator.RegisterNew();
a_io_2.SetDefaultTimeout(one_second);
Io<SimulatorTransport> a_io_3 = simulator.RegisterNew();
a_io_3.SetDefaultTimeout(one_second);
// Io<SimulatorTransport> b_io_1 = simulator.RegisterNew();
// b_io_1.SetDefaultTimeout(one_second);
// Io<SimulatorTransport> b_io_2 = simulator.RegisterNew();
// b_io_2.SetDefaultTimeout(one_second);
// Io<SimulatorTransport> b_io_3 = simulator.RegisterNew();
// b_io_3.SetDefaultTimeout(one_second);
Io<SimulatorTransport> b_io_1 = simulator.RegisterNew();
b_io_1.SetDefaultTimeout(one_second);
Io<SimulatorTransport> b_io_2 = simulator.RegisterNew();
b_io_2.SetDefaultTimeout(one_second);
Io<SimulatorTransport> b_io_3 = simulator.RegisterNew();
b_io_3.SetDefaultTimeout(one_second);
// // Preconfigure coordinator with kv shard 'A' and 'B'
// auto sm1 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
// b_io_2.GetAddress(), b_io_3.GetAddress());
// auto sm2 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
// b_io_2.GetAddress(), b_io_3.GetAddress());
// auto sm3 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
// b_io_2.GetAddress(), b_io_3.GetAddress());
// Preconfigure coordinator with kv shard 'A' and 'B'
auto sm1 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
b_io_2.GetAddress(), b_io_3.GetAddress());
auto sm2 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
b_io_2.GetAddress(), b_io_3.GetAddress());
auto sm3 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
b_io_2.GetAddress(), b_io_3.GetAddress());
// // Spin up shard A
// std::vector<Address> a_addrs = {a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress()};
// Spin up shard A
std::vector<Address> a_addrs = {a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress()};
// std::vector<Address> a_1_peers = {a_addrs[1], a_addrs[2]};
// std::vector<Address> a_2_peers = {a_addrs[0], a_addrs[2]};
// std::vector<Address> a_3_peers = {a_addrs[0], a_addrs[1]};
std::vector<Address> a_1_peers = {a_addrs[1], a_addrs[2]};
std::vector<Address> a_2_peers = {a_addrs[0], a_addrs[2]};
std::vector<Address> a_3_peers = {a_addrs[0], a_addrs[1]};
// ConcreteStorageRsm a_1{std::move(a_io_1), a_1_peers, MockedShardRsm{}};
// ConcreteStorageRsm a_2{std::move(a_io_2), a_2_peers, MockedShardRsm{}};
// ConcreteStorageRsm a_3{std::move(a_io_3), a_3_peers, MockedShardRsm{}};
ConcreteStorageRsm a_1{std::move(a_io_1), a_1_peers, MockedShardRsm{}};
ConcreteStorageRsm a_2{std::move(a_io_2), a_2_peers, MockedShardRsm{}};
ConcreteStorageRsm a_3{std::move(a_io_3), a_3_peers, MockedShardRsm{}};
// auto a_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_1));
// simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[0]);
auto a_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_1));
simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[0]);
// auto a_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_2));
// simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[1]);
auto a_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_2));
simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[1]);
// auto a_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_3));
// simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[2]);
auto a_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_3));
simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[2]);
// // Spin up shard B
// std::vector<Address> b_addrs = {b_io_1.GetAddress(), b_io_2.GetAddress(), b_io_3.GetAddress()};
// Spin up shard B
std::vector<Address> b_addrs = {b_io_1.GetAddress(), b_io_2.GetAddress(), b_io_3.GetAddress()};
// std::vector<Address> b_1_peers = {b_addrs[1], b_addrs[2]};
// std::vector<Address> b_2_peers = {b_addrs[0], b_addrs[2]};
// std::vector<Address> b_3_peers = {b_addrs[0], b_addrs[1]};
std::vector<Address> b_1_peers = {b_addrs[1], b_addrs[2]};
std::vector<Address> b_2_peers = {b_addrs[0], b_addrs[2]};
std::vector<Address> b_3_peers = {b_addrs[0], b_addrs[1]};
// ConcreteStorageRsm b_1{std::move(b_io_1), b_1_peers, MockedShardRsm{}};
// ConcreteStorageRsm b_2{std::move(b_io_2), b_2_peers, MockedShardRsm{}};
// ConcreteStorageRsm b_3{std::move(b_io_3), b_3_peers, MockedShardRsm{}};
ConcreteStorageRsm b_1{std::move(b_io_1), b_1_peers, MockedShardRsm{}};
ConcreteStorageRsm b_2{std::move(b_io_2), b_2_peers, MockedShardRsm{}};
ConcreteStorageRsm b_3{std::move(b_io_3), b_3_peers, MockedShardRsm{}};
// auto b_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_1));
// simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[0]);
auto b_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_1));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[0]);
// auto b_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_2));
// simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[1]);
auto b_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_2));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[1]);
// auto b_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_3));
// simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[2]);
auto b_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_3));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[2]);
// // Spin up coordinators
// Spin up coordinators
// Io<SimulatorTransport> c_io_1 = simulator.RegisterNew();
// c_io_1.SetDefaultTimeout(one_second);
// Io<SimulatorTransport> c_io_2 = simulator.RegisterNew();
// c_io_2.SetDefaultTimeout(one_second);
// Io<SimulatorTransport> c_io_3 = simulator.RegisterNew();
// c_io_3.SetDefaultTimeout(one_second);
Io<SimulatorTransport> c_io_1 = simulator.RegisterNew();
c_io_1.SetDefaultTimeout(one_second);
Io<SimulatorTransport> c_io_2 = simulator.RegisterNew();
c_io_2.SetDefaultTimeout(one_second);
Io<SimulatorTransport> c_io_3 = simulator.RegisterNew();
c_io_3.SetDefaultTimeout(one_second);
// std::vector<Address> c_addrs = {c_io_1.GetAddress(), c_io_2.GetAddress(), c_io_3.GetAddress()};
std::vector<Address> c_addrs = {c_io_1.GetAddress(), c_io_2.GetAddress(), c_io_3.GetAddress()};
// std::vector<Address> c_1_peers = {c_addrs[1], c_addrs[2]};
// std::vector<Address> c_2_peers = {c_addrs[0], c_addrs[2]};
// std::vector<Address> c_3_peers = {c_addrs[0], c_addrs[1]};
std::vector<Address> c_1_peers = {c_addrs[1], c_addrs[2]};
std::vector<Address> c_2_peers = {c_addrs[0], c_addrs[2]};
std::vector<Address> c_3_peers = {c_addrs[0], c_addrs[1]};
// ConcreteCoordinatorRsm c_1{std::move(c_io_1), c_1_peers, Coordinator{(sm1)}};
// ConcreteCoordinatorRsm c_2{std::move(c_io_2), c_2_peers, Coordinator{(sm2)}};
// ConcreteCoordinatorRsm c_3{std::move(c_io_3), c_3_peers, Coordinator{(sm3)}};
ConcreteCoordinatorRsm c_1{std::move(c_io_1), c_1_peers, Coordinator{(sm1)}};
ConcreteCoordinatorRsm c_2{std::move(c_io_2), c_2_peers, Coordinator{(sm2)}};
ConcreteCoordinatorRsm c_3{std::move(c_io_3), c_3_peers, Coordinator{(sm3)}};
// auto c_thread_1 = std::jthread([c_1]() mutable { c_1.Run(); });
// simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[0]);
auto c_thread_1 = std::jthread([c_1]() mutable { c_1.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[0]);
// auto c_thread_2 = std::jthread([c_2]() mutable { c_2.Run(); });
// simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[1]);
auto c_thread_2 = std::jthread([c_2]() mutable { c_2.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[1]);
// auto c_thread_3 = std::jthread([c_3]() mutable { c_3.Run(); });
// simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[2]);
auto c_thread_3 = std::jthread([c_3]() mutable { c_3.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[2]);
// std::cout << "beginning test after servers have become quiescent" << std::endl;
std::cout << "beginning test after servers have become quiescent" << std::endl;
// // Have client contact coordinator RSM for a new transaction ID and
// // also get the current shard map
// CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs);
// Have client contact coordinator RSM for a new transaction ID and
// also get the current shard map
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs);
// memgraph::msgs::ShardRequestManager<SimulatorTransport> io(std::move(coordinator_client), std::move(cli_io));
msgs::ShardRequestManager<SimulatorTransport> io(std::move(coordinator_client), std::move(cli_io));
// io.StartTransaction();
// TestScanAll(io);
// TestCreateVertices(io);
io.StartTransaction();
TestScanVertices(io);
TestCreateVertices(io);
TestCreateExpand(io);
// simulator.ShutDown();
return 0;
simulator.ShutDown();
SimulatorStats stats = simulator.Stats();
std::cout << "total messages: " << stats.total_messages << std::endl;
std::cout << "dropped messages: " << stats.dropped_messages << std::endl;
std::cout << "timed out requests: " << stats.timed_out_requests << std::endl;
std::cout << "total requests: " << stats.total_requests << std::endl;
std::cout << "total responses: " << stats.total_responses << std::endl;
std::cout << "simulator ticks: " << stats.simulator_ticks << std::endl;
std::cout << "========================== SUCCESS :) ==========================" << std::endl;
}
} // namespace memgraph::query::v2::tests
int main() { memgraph::query::v2::tests::DoTest(); }

View File

@ -136,6 +136,7 @@ void Commit(ShardClient &client, const coordinator::Hlc &transaction_timestamp)
auto write_response_result = write_res.GetValue();
auto write_response = std::get<msgs::CommitResponse>(write_response_result);
MG_ASSERT(write_response.success, "Commit expected to be successful, but it is failed");
break;
}
@ -210,7 +211,7 @@ bool AttemptToUpdateVertex(ShardClient &client, int64_t value) {
}
bool AttemptToAddEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t value_of_vertex_2, int64_t edge_gid,
int64_t edge_type_id) {
EdgeTypeId edge_type_id) {
auto id = msgs::EdgeId{};
msgs::Label label = {.id = get_primary_label()};
@ -249,7 +250,7 @@ bool AttemptToAddEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t va
bool AttemptToAddEdgeWithProperties(ShardClient &client, int64_t value_of_vertex_1, int64_t value_of_vertex_2,
int64_t edge_gid, uint64_t edge_prop_id, int64_t edge_prop_val,
const std::vector<uint64_t> &edge_type_id) {
const std::vector<EdgeTypeId> &edge_type_id) {
msgs::EdgeId id1;
msgs::Label label = {.id = get_primary_label()};
@ -282,7 +283,7 @@ bool AttemptToAddEdgeWithProperties(ShardClient &client, int64_t value_of_vertex
}
bool AttemptToDeleteEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t value_of_vertex_2, int64_t edge_gid,
int64_t edge_type_id) {
EdgeTypeId edge_type_id) {
auto id = msgs::EdgeId{};
msgs::Label label = {.id = get_primary_label()};
@ -319,7 +320,7 @@ bool AttemptToDeleteEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t
}
bool AttemptToUpdateEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t value_of_vertex_2, int64_t edge_gid,
int64_t edge_type_id, uint64_t edge_prop_id, int64_t edge_prop_val) {
EdgeTypeId edge_type_id, uint64_t edge_prop_id, int64_t edge_prop_val) {
auto id = msgs::EdgeId{};
msgs::Label label = {.id = get_primary_label()};
@ -338,7 +339,7 @@ bool AttemptToUpdateEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t
auto edge_prop = std::vector<std::pair<PropertyId, msgs::Value>>{
std::make_pair(PropertyId::FromUint(edge_prop_id), msgs::Value(edge_prop_val))};
msgs::UpdateEdgeProp update_props{.src = src, .dst = dst, .edge_id = id, .property_updates = edge_prop};
msgs::UpdateEdgeProp update_props{.edge_id = id, .src = src, .dst = dst, .property_updates = edge_prop};
msgs::UpdateEdgesRequest update_req{};
update_req.transaction_id.logical_id = GetTransactionId();
@ -444,61 +445,7 @@ std::tuple<size_t, std::optional<msgs::VertexId>> AttemptToScanAllWithExpression
}
}
void AttemptToExpandOneWithWrongEdgeType(ShardClient &client, uint64_t src_vertex_val, uint64_t edge_type_id) {
// Source vertex
msgs::Label label = {.id = get_primary_label()};
auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val));
// Edge type
auto edge_type = msgs::EdgeType{};
edge_type.id = edge_type_id + 1;
// Edge direction
auto edge_direction = msgs::EdgeDirection::OUT;
// Source Vertex properties to look for
std::optional<std::vector<PropertyId>> src_vertex_properties = {};
// Edge properties to look for
std::optional<std::vector<PropertyId>> edge_properties = {};
std::vector<msgs::Expression> expressions;
std::optional<std::vector<msgs::OrderBy>> order_by = {};
std::optional<size_t> limit = {};
std::optional<msgs::Filter> filter = {};
msgs::ExpandOneRequest expand_one_req{};
expand_one_req.direction = edge_direction;
expand_one_req.edge_properties = edge_properties;
expand_one_req.edge_types = {edge_type};
expand_one_req.expressions = expressions;
expand_one_req.filter = filter;
expand_one_req.limit = limit;
expand_one_req.order_by = order_by;
expand_one_req.src_vertex_properties = src_vertex_properties;
expand_one_req.src_vertices = {src_vertex};
expand_one_req.transaction_id.logical_id = GetTransactionId();
while (true) {
auto read_res = client.SendReadRequest(expand_one_req);
if (read_res.HasError()) {
continue;
}
auto write_response_result = read_res.GetValue();
auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result);
MG_ASSERT(write_response.result.size() == 1);
MG_ASSERT(write_response.result[0].edges_with_all_properties);
MG_ASSERT(write_response.result[0].edges_with_all_properties->size() == 0);
MG_ASSERT(!write_response.result[0].edges_with_specific_properties);
break;
}
}
void AttemptToExpandOneSimple(ShardClient &client, uint64_t src_vertex_val, uint64_t edge_type_id) {
void AttemptToExpandOneWithWrongEdgeType(ShardClient &client, uint64_t src_vertex_val, EdgeTypeId edge_type_id) {
// Source vertex
msgs::Label label = {.id = get_primary_label()};
auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val));
@ -543,17 +490,74 @@ void AttemptToExpandOneSimple(ShardClient &client, uint64_t src_vertex_val, uint
auto write_response_result = read_res.GetValue();
auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result);
MG_ASSERT(write_response.result.size() == 1);
MG_ASSERT(write_response.result[0].edges_with_all_properties->size() == 2);
auto number_of_properties_on_edge =
(std::get<std::map<PropertyId, msgs::Value>>(write_response.result[0].edges_with_all_properties.value()[0]))
.size();
MG_ASSERT(write_response.result[0].in_edges_with_all_properties.empty());
MG_ASSERT(write_response.result[0].out_edges_with_all_properties.empty());
MG_ASSERT(write_response.result[0].in_edges_with_specific_properties.empty());
MG_ASSERT(write_response.result[0].out_edges_with_specific_properties.empty());
break;
}
}
void AttemptToExpandOneSimple(ShardClient &client, uint64_t src_vertex_val, EdgeTypeId edge_type_id) {
// Source vertex
msgs::Label label = {.id = get_primary_label()};
auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val));
// Edge type
auto edge_type = msgs::EdgeType{};
edge_type.id = edge_type_id;
// Edge direction
auto edge_direction = msgs::EdgeDirection::OUT;
// Source Vertex properties to look for
std::optional<std::vector<PropertyId>> src_vertex_properties = {};
// Edge properties to look for
std::optional<std::vector<PropertyId>> edge_properties = {};
std::vector<msgs::Expression> expressions;
std::optional<std::vector<msgs::OrderBy>> order_by = {};
std::optional<size_t> limit = {};
std::optional<msgs::Filter> filter = {};
msgs::ExpandOneRequest expand_one_req{};
expand_one_req.direction = edge_direction;
expand_one_req.edge_properties = edge_properties;
expand_one_req.edge_types = {edge_type};
expand_one_req.expressions = expressions;
expand_one_req.filter = filter;
expand_one_req.limit = limit;
expand_one_req.order_by = order_by;
expand_one_req.src_vertex_properties = src_vertex_properties;
expand_one_req.src_vertices = {src_vertex};
expand_one_req.transaction_id.logical_id = GetTransactionId();
while (true) {
auto read_res = client.SendReadRequest(expand_one_req);
if (read_res.HasError()) {
continue;
}
auto write_response_result = read_res.GetValue();
auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result);
MG_ASSERT(write_response.result.size() == 1);
MG_ASSERT(write_response.result[0].out_edges_with_all_properties.size() == 2);
MG_ASSERT(write_response.result[0].in_edges_with_all_properties.empty());
MG_ASSERT(write_response.result[0].in_edges_with_specific_properties.empty());
MG_ASSERT(write_response.result[0].out_edges_with_specific_properties.empty());
const auto number_of_properties_on_edge =
(write_response.result[0].out_edges_with_all_properties[0]).properties.size();
MG_ASSERT(number_of_properties_on_edge == 1);
break;
}
}
void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uint64_t src_vertex_val,
uint64_t edge_type_id) {
EdgeTypeId edge_type_id) {
// Source vertex
msgs::Label label = {.id = get_primary_label()};
auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val));
@ -599,19 +603,21 @@ void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uin
auto write_response_result = read_res.GetValue();
auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result);
MG_ASSERT(write_response.result.size() == 1);
auto src_vertex_props_size = write_response.result[0].src_vertex_properties->size();
auto src_vertex_props_size = write_response.result[0].src_vertex_properties.size();
MG_ASSERT(src_vertex_props_size == 1);
MG_ASSERT(write_response.result[0].edges_with_all_properties->size() == 2);
auto number_of_properties_on_edge =
(std::get<std::map<PropertyId, msgs::Value>>(write_response.result[0].edges_with_all_properties.value()[0]))
.size();
MG_ASSERT(write_response.result[0].out_edges_with_all_properties.size() == 2);
MG_ASSERT(write_response.result[0].in_edges_with_all_properties.empty());
MG_ASSERT(write_response.result[0].in_edges_with_specific_properties.empty());
MG_ASSERT(write_response.result[0].out_edges_with_specific_properties.empty());
const auto number_of_properties_on_edge =
(write_response.result[0].out_edges_with_all_properties[0]).properties.size();
MG_ASSERT(number_of_properties_on_edge == 1);
break;
}
}
void AttemptToExpandOneWithSpecifiedEdgeProperties(ShardClient &client, uint64_t src_vertex_val, uint64_t edge_type_id,
uint64_t edge_prop_id) {
void AttemptToExpandOneWithSpecifiedEdgeProperties(ShardClient &client, uint64_t src_vertex_val,
EdgeTypeId edge_type_id, uint64_t edge_prop_id) {
// Source vertex
msgs::Label label = {.id = get_primary_label()};
auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val));
@ -657,9 +663,13 @@ void AttemptToExpandOneWithSpecifiedEdgeProperties(ShardClient &client, uint64_t
auto write_response_result = read_res.GetValue();
auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result);
MG_ASSERT(write_response.result.size() == 1);
auto specific_properties_size =
(std::get<std::vector<msgs::Value>>(write_response.result[0].edges_with_specific_properties.value()[0]));
MG_ASSERT(specific_properties_size.size() == 1);
MG_ASSERT(write_response.result[0].out_edges_with_specific_properties.size() == 2);
MG_ASSERT(write_response.result[0].in_edges_with_specific_properties.empty());
MG_ASSERT(write_response.result[0].in_edges_with_all_properties.empty());
MG_ASSERT(write_response.result[0].out_edges_with_all_properties.empty());
const auto specific_properties_size =
(write_response.result[0].out_edges_with_specific_properties[0]).properties.size();
MG_ASSERT(specific_properties_size == 1);
break;
}
}
@ -693,7 +703,7 @@ void TestCreateEdge(ShardClient &client) {
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2));
auto edge_gid = GetUniqueInteger();
auto edge_type_id = GetUniqueInteger();
auto edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger());
MG_ASSERT(AttemptToAddEdge(client, unique_prop_val_1, unique_prop_val_2, edge_gid, edge_type_id));
}
@ -707,7 +717,7 @@ void TestCreateAndDeleteEdge(ShardClient &client) {
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2));
auto edge_gid = GetUniqueInteger();
auto edge_type_id = GetUniqueInteger();
auto edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger());
MG_ASSERT(AttemptToAddEdge(client, unique_prop_val_1, unique_prop_val_2, edge_gid, edge_type_id));
@ -724,7 +734,7 @@ void TestUpdateEdge(ShardClient &client) {
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2));
auto edge_gid = GetUniqueInteger();
auto edge_type_id = GetUniqueInteger();
auto edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger());
auto edge_prop_id = GetUniqueInteger();
auto edge_prop_val_old = GetUniqueInteger();
@ -819,7 +829,8 @@ void TestExpandOne(ShardClient &client) {
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_3));
auto edge_type_id = GetUniqueInteger();
auto edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger());
auto wrong_edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger());
auto edge_gid_1 = GetUniqueInteger();
auto edge_gid_2 = GetUniqueInteger();
@ -835,7 +846,7 @@ void TestExpandOne(ShardClient &client) {
edge_prop_val, {edge_type_id}));
AttemptToExpandOneSimple(client, unique_prop_val_1, edge_type_id);
AttemptToExpandOneWithWrongEdgeType(client, unique_prop_val_1, edge_type_id);
AttemptToExpandOneWithWrongEdgeType(client, unique_prop_val_1, wrong_edge_type_id);
AttemptToExpandOneWithSpecifiedSrcVertexProperties(client, unique_prop_val_1, edge_type_id);
AttemptToExpandOneWithSpecifiedEdgeProperties(client, unique_prop_val_1, edge_type_id, edge_prop_id);
}

View File

@ -86,6 +86,7 @@ ShardMap TestShardMap() {
const auto label_id = sm.InitializeNewLabel(kLabelName, schema, replication_factor, sm.shard_map_version);
EXPECT_TRUE(label_id.has_value());
sm.AllocateEdgeTypeIds(std::vector<std::string>{"edge_type"});
// split the shard at N split points
// NB: this is the logic that should be provided by the "split file"
// TODO(tyler) split points should account for signedness
@ -116,12 +117,11 @@ void TestScanAll(ShardRequestManager &shard_request_manager) {
EXPECT_EQ(result.size(), 2);
}
template <typename ShardRequestManager>
void TestCreateVertices(ShardRequestManager &shard_request_manager) {
void TestCreateVertices(msgs::ShardRequestManagerInterface &shard_request_manager) {
using PropVal = msgs::Value;
msgs::ExecutionState<msgs::CreateVerticesRequest> state;
std::vector<msgs::NewVertex> new_vertices;
auto label_id = shard_request_manager.LabelNameToLabelId(kLabelName);
auto label_id = shard_request_manager.NameToLabel(kLabelName);
msgs::NewVertex a1{.primary_key = {PropVal(int64_t(0)), PropVal(int64_t(0))}};
a1.label_ids.push_back({label_id});
msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}};
@ -133,8 +133,40 @@ void TestCreateVertices(ShardRequestManager &shard_request_manager) {
EXPECT_EQ(result.size(), 1);
}
template <typename ShardRequestManager>
void TestExpand(ShardRequestManager &shard_request_manager) {}
void TestCreateExpand(msgs::ShardRequestManagerInterface &shard_request_manager) {
using PropVal = msgs::Value;
msgs::ExecutionState<msgs::CreateExpandRequest> state;
std::vector<msgs::NewExpand> new_expands;
const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type");
const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")};
const msgs::VertexId vertex_id_1{label, {PropVal(int64_t(0)), PropVal(int64_t(0))}};
const msgs::VertexId vertex_id_2{label, {PropVal(int64_t(13)), PropVal(int64_t(13))}};
msgs::NewExpand expand_1{
.id = {.gid = 0}, .type = {edge_type_id}, .src_vertex = vertex_id_1, .dest_vertex = vertex_id_2};
msgs::NewExpand expand_2{
.id = {.gid = 1}, .type = {edge_type_id}, .src_vertex = vertex_id_2, .dest_vertex = vertex_id_1};
new_expands.push_back(std::move(expand_1));
new_expands.push_back(std::move(expand_2));
auto responses = shard_request_manager.Request(state, std::move(new_expands));
MG_ASSERT(responses.size() == 1);
MG_ASSERT(responses[0].success);
}
void TestExpandOne(msgs::ShardRequestManagerInterface &shard_request_manager) {
msgs::ExecutionState<msgs::ExpandOneRequest> state{};
msgs::ExpandOneRequest request;
const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type");
const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")};
request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}});
request.edge_types.push_back(msgs::EdgeType{edge_type_id});
request.direction = msgs::EdgeDirection::BOTH;
auto result_rows = shard_request_manager.Request(state, std::move(request));
MG_ASSERT(result_rows.size() == 1);
MG_ASSERT(result_rows[0].in_edges_with_all_properties.size() == 1);
MG_ASSERT(result_rows[0].out_edges_with_all_properties.size() == 1);
}
template <typename ShardRequestManager>
void TestAggregate(ShardRequestManager &shard_request_manager) {}
@ -198,6 +230,8 @@ TEST(MachineManager, BasicFunctionality) {
shard_request_manager.StartTransaction();
TestCreateVertices(shard_request_manager);
TestScanAll(shard_request_manager);
TestCreateExpand(shard_request_manager);
TestExpandOne(shard_request_manager);
local_system.ShutDown();
};