Implement missing message handlers (#563)

Implement the missing message handlers with basic functionality. The
implementation does not include any capabilities to filter based on
expressions.
This commit is contained in:
gvolfing 2022-10-03 15:31:06 +02:00 committed by GitHub
parent b5c7078c7d
commit 87111b2f89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1119 additions and 128 deletions

View File

@ -57,17 +57,10 @@ struct EdgeId {
Gid gid;
};
struct Vertex {
VertexId id;
std::vector<Label> labels;
friend bool operator==(const Vertex &lhs, const Vertex &rhs) {
return (lhs.id == rhs.id) && (lhs.labels == rhs.labels);
}
};
struct Edge {
VertexId src;
VertexId dst;
std::optional<std::vector<std::pair<PropertyId, Value>>> properties;
EdgeId id;
EdgeType type;
friend bool operator==(const Edge &lhs, const Edge &rhs) {
@ -75,6 +68,12 @@ struct Edge {
}
};
struct Vertex {
VertexId id;
std::vector<Label> labels;
friend bool operator==(const Vertex &lhs, const Vertex &rhs) { return lhs.id == rhs.id; }
};
struct PathPart {
Vertex dst;
Gid edge;
@ -458,8 +457,15 @@ struct ExpandOneResultRow {
// The drawback of this is currently the key of the map is always interpreted as a string in Value, not as an
// integer, which should be in case of mapped properties.
Vertex src_vertex;
std::optional<Values> src_vertex_properties;
Values edges;
std::optional<std::map<PropertyId, Value>> src_vertex_properties;
// NOTE: If the desired edges are specified in the request,
// edges_with_specific_properties will have a value and it will
// return the properties as a vector of property values. The order
// of the values returned should be the same as the PropertyIds
// were defined in the request.
std::optional<std::vector<std::tuple<VertexId, Gid, std::map<PropertyId, Value>>>> edges_with_all_properties;
std::optional<std::vector<std::tuple<VertexId, Gid, std::vector<Value>>>> edges_with_specific_properties;
};
struct ExpandOneResponse {
@ -467,12 +473,14 @@ struct ExpandOneResponse {
};
struct UpdateVertexProp {
VertexId primary_key;
PrimaryKey primary_key;
std::vector<std::pair<PropertyId, Value>> property_updates;
};
struct UpdateEdgeProp {
Edge edge;
EdgeId edge_id;
VertexId src;
VertexId dst;
std::vector<std::pair<PropertyId, Value>> property_updates;
};

View File

@ -16,8 +16,9 @@ set(storage_v3_src_files
schemas.cpp
schema_validator.cpp
shard.cpp
bindings/typed_value.cpp
storage.cpp
shard_rsm.cpp
bindings/typed_value.cpp
storage.cpp)
# ######################

View File

@ -9,9 +9,11 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <functional>
#include <iterator>
#include <utility>
#include "parser/opencypher/parser.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/shard_rsm.hpp"
#include "storage/v3/value_conversions.hpp"
@ -20,6 +22,7 @@
using memgraph::msgs::Label;
using memgraph::msgs::PropertyId;
using memgraph::msgs::Value;
using memgraph::msgs::Vertex;
using memgraph::msgs::VertexId;
using memgraph::storage::conversions::ConvertPropertyVector;
@ -28,7 +31,6 @@ using memgraph::storage::conversions::ToPropertyValue;
using memgraph::storage::conversions::ToValue;
namespace {
std::vector<std::pair<memgraph::storage::v3::PropertyId, memgraph::storage::v3::PropertyValue>> ConvertPropertyMap(
std::vector<std::pair<PropertyId, Value>> &&properties) {
std::vector<std::pair<memgraph::storage::v3::PropertyId, memgraph::storage::v3::PropertyValue>> ret;
@ -53,7 +55,7 @@ std::vector<std::pair<memgraph::storage::v3::PropertyId, Value>> FromMap(
return ret;
}
std::optional<std::map<PropertyId, Value>> CollectPropertiesFromAccessor(
std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(
const memgraph::storage::v3::VertexAccessor &acc, const std::vector<memgraph::storage::v3::PropertyId> &props,
memgraph::storage::v3::View view) {
std::map<PropertyId, Value> ret;
@ -62,14 +64,14 @@ std::optional<std::map<PropertyId, Value>> CollectPropertiesFromAccessor(
auto result = acc.GetProperty(prop, view);
if (result.HasError()) {
spdlog::debug("Encountered an Error while trying to get a vertex property.");
continue;
return std::nullopt;
}
auto &value = result.GetValue();
if (value.IsNull()) {
spdlog::debug("The specified property does not exist but it should");
continue;
return std::nullopt;
}
ret.emplace(prop, ToValue(value));
ret.emplace(std::make_pair(prop, ToValue(value)));
}
return ret;
@ -81,6 +83,7 @@ std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(
auto iter = acc.Properties(view);
if (iter.HasError()) {
spdlog::debug("Encountered an error while trying to get vertex properties.");
return std::nullopt;
}
for (const auto &[prop_key, prop_val] : iter.GetValue()) {
@ -109,6 +112,293 @@ Value ConstructValueVertex(const memgraph::storage::v3::VertexAccessor &acc, mem
return Value({.id = vertex_id, .labels = value_labels});
}
bool DoesEdgeTypeMatch(const memgraph::msgs::ExpandOneRequest &req, const memgraph::storage::v3::EdgeAccessor &edge) {
// TODO(gvolfing) This should be checked only once and handled accordingly.
if (req.edge_types.empty()) {
return true;
}
return std::ranges::any_of(req.edge_types.cbegin(), req.edge_types.cend(),
[&edge](const memgraph::msgs::EdgeType &edge_type) {
return memgraph::storage::v3::EdgeTypeId::FromUint(edge_type.id) == edge.EdgeType();
});
}
struct LocalError {};
std::optional<memgraph::msgs::Vertex> FillUpSourceVertex(
const std::optional<memgraph::storage::v3::VertexAccessor> &v_acc, memgraph::msgs::ExpandOneRequest &req,
memgraph::msgs::VertexId src_vertex) {
auto secondary_labels = v_acc->Labels(memgraph::storage::v3::View::OLD);
if (secondary_labels.HasError()) {
spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}",
req.transaction_id.logical_id);
return std::nullopt;
}
memgraph::msgs::Vertex source_vertex;
source_vertex.id = src_vertex;
source_vertex.labels.reserve(secondary_labels.GetValue().size());
for (auto label_id : secondary_labels.GetValue()) {
source_vertex.labels.emplace_back(memgraph::msgs::Label{.id = label_id});
}
return source_vertex;
}
std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(
const std::optional<memgraph::storage::v3::VertexAccessor> &v_acc, memgraph::msgs::ExpandOneRequest &req) {
std::map<PropertyId, Value> src_vertex_properties;
if (!req.src_vertex_properties) {
auto props = v_acc->Properties(memgraph::storage::v3::View::OLD);
if (props.HasError()) {
spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}",
req.transaction_id.logical_id);
return std::nullopt;
}
for (auto &[key, val] : props.GetValue()) {
src_vertex_properties.insert(std::make_pair(key, ToValue(val)));
}
} else if (req.src_vertex_properties.value().empty()) {
// NOOP
} else {
for (const auto &prop : req.src_vertex_properties.value()) {
const auto &prop_val = v_acc->GetProperty(prop, memgraph::storage::v3::View::OLD);
src_vertex_properties.insert(std::make_pair(prop, ToValue(prop_val.GetValue())));
}
}
return src_vertex_properties;
}
std::optional<std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>> FillUpConnectingEdges(
const std::optional<memgraph::storage::v3::VertexAccessor> &v_acc, memgraph::msgs::ExpandOneRequest &req) {
std::vector<memgraph::storage::v3::EdgeAccessor> in_edges;
std::vector<memgraph::storage::v3::EdgeAccessor> out_edges;
switch (req.direction) {
case memgraph::msgs::EdgeDirection::OUT: {
auto out_edges_result = v_acc->OutEdges(memgraph::storage::v3::View::OLD);
if (out_edges_result.HasError()) {
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
req.transaction_id.logical_id);
return std::nullopt;
}
out_edges = std::move(out_edges_result.GetValue());
break;
}
case memgraph::msgs::EdgeDirection::IN: {
auto in_edges_result = v_acc->InEdges(memgraph::storage::v3::View::OLD);
if (in_edges_result.HasError()) {
spdlog::debug(
"Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}"[req.transaction_id
.logical_id]);
return std::nullopt;
}
in_edges = std::move(in_edges_result.GetValue());
break;
}
case memgraph::msgs::EdgeDirection::BOTH: {
auto in_edges_result = v_acc->InEdges(memgraph::storage::v3::View::OLD);
if (in_edges_result.HasError()) {
spdlog::debug("Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}",
req.transaction_id.logical_id);
return std::nullopt;
}
in_edges = std::move(in_edges_result.GetValue());
auto out_edges_result = v_acc->OutEdges(memgraph::storage::v3::View::OLD);
if (out_edges_result.HasError()) {
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
req.transaction_id.logical_id);
return std::nullopt;
}
out_edges = std::move(out_edges_result.GetValue());
break;
}
}
return std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>{in_edges, out_edges};
}
using AllEdgePropertyDataSructure = std::map<PropertyId, memgraph::msgs::Value>;
using SpecificEdgePropertyDataSructure = std::vector<memgraph::msgs::Value>;
using AllEdgeProperties = std::tuple<memgraph::msgs::VertexId, memgraph::msgs::Gid, AllEdgePropertyDataSructure>;
using SpecificEdgeProperties =
std::tuple<memgraph::msgs::VertexId, memgraph::msgs::Gid, SpecificEdgePropertyDataSructure>;
using SpecificEdgePropertiesVector = std::vector<SpecificEdgeProperties>;
using AllEdgePropertiesVector = std::vector<AllEdgeProperties>;
template <typename ReturnType, typename EdgeProperties, typename EdgePropertyDataStructure, typename Functor>
std::optional<ReturnType> GetEdgesWithProperties(const std::vector<memgraph::storage::v3::EdgeAccessor> &edges,
const memgraph::msgs::ExpandOneRequest &req,
Functor get_edge_properties) {
ReturnType ret;
ret.reserve(edges.size());
for (const auto &edge : edges) {
if (!DoesEdgeTypeMatch(req, edge)) {
continue;
}
EdgeProperties ret_tuple;
memgraph::msgs::Label label;
label.id = edge.FromVertex().primary_label;
memgraph::msgs::VertexId other_vertex = std::make_pair(label, ConvertValueVector(edge.FromVertex().primary_key));
const auto edge_props_var = get_edge_properties(edge);
if (std::get_if<LocalError>(&edge_props_var) != nullptr) {
return std::nullopt;
}
auto edge_props = std::get<EdgePropertyDataStructure>(edge_props_var);
memgraph::msgs::Gid gid = edge.Gid().AsUint();
ret.emplace_back(EdgeProperties{other_vertex, gid, edge_props});
}
return ret;
}
template <typename TPropertyValue, typename TPropertyNullopt>
void SetFinalEdgeProperties(std::optional<TPropertyValue> &properties_to_value,
std::optional<TPropertyNullopt> &properties_to_nullopt, const TPropertyValue &ret_out,
const TPropertyValue &ret_in, const memgraph::msgs::ExpandOneRequest &req) {
switch (req.direction) {
case memgraph::msgs::EdgeDirection::OUT: {
properties_to_value = std::move(ret_out);
break;
}
case memgraph::msgs::EdgeDirection::IN: {
properties_to_value = std::move(ret_in);
break;
}
case memgraph::msgs::EdgeDirection::BOTH: {
TPropertyValue ret;
ret.resize(ret_out.size() + ret_in.size());
ret.insert(ret.end(), std::make_move_iterator(ret_in.begin()), std::make_move_iterator(ret_in.end()));
ret.insert(ret.end(), std::make_move_iterator(ret_out.begin()), std::make_move_iterator(ret_out.end()));
properties_to_value = ret;
break;
}
}
properties_to_nullopt = {};
}
std::optional<memgraph::msgs::ExpandOneResultRow> GetExpandOneResult(memgraph::storage::v3::Shard::Accessor &acc,
memgraph::msgs::VertexId src_vertex,
memgraph::msgs::ExpandOneRequest req) {
using EdgeProperties =
std::variant<LocalError, std::map<PropertyId, memgraph::msgs::Value>, std::vector<memgraph::msgs::Value>>;
std::function<EdgeProperties(const memgraph::storage::v3::EdgeAccessor &)> get_edge_properties;
if (!req.edge_properties) {
get_edge_properties = [&req](const memgraph::storage::v3::EdgeAccessor &edge) -> EdgeProperties {
std::map<PropertyId, memgraph::msgs::Value> ret;
auto property_results = edge.Properties(memgraph::storage::v3::View::OLD);
if (property_results.HasError()) {
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
req.transaction_id.logical_id);
return LocalError{};
}
for (const auto &[prop_key, prop_val] : property_results.GetValue()) {
ret.insert(std::make_pair(prop_key, ToValue(prop_val)));
}
return ret;
};
} else {
// TODO(gvolfing) - do we want to set the action_successful here?
get_edge_properties = [&req](const memgraph::storage::v3::EdgeAccessor &edge) {
std::vector<memgraph::msgs::Value> ret;
ret.reserve(req.edge_properties.value().size());
for (const auto &edge_prop : req.edge_properties.value()) {
// TODO(gvolfing) maybe check for the absence of certain properties
ret.emplace_back(ToValue(edge.GetProperty(edge_prop, memgraph::storage::v3::View::OLD).GetValue()));
}
return ret;
};
}
/// Fill up source vertex
auto v_acc = acc.FindVertex(ConvertPropertyVector(std::move(src_vertex.second)), memgraph::storage::v3::View::OLD);
auto source_vertex = FillUpSourceVertex(v_acc, req, src_vertex);
if (!source_vertex) {
return std::nullopt;
}
/// Fill up source vertex properties
auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req);
if (!src_vertex_properties) {
return std::nullopt;
}
/// Fill up connecting edges
auto fill_up_connecting_edges = FillUpConnectingEdges(v_acc, req);
if (!fill_up_connecting_edges) {
return std::nullopt;
}
auto [in_edges, out_edges] = fill_up_connecting_edges.value();
/// Assemble the edge properties
std::optional<AllEdgePropertiesVector> edges_with_all_properties;
std::optional<SpecificEdgePropertiesVector> edges_with_specific_properties;
if (!req.edge_properties) {
auto ret_in_opt = GetEdgesWithProperties<AllEdgePropertiesVector, AllEdgeProperties, AllEdgePropertyDataSructure>(
in_edges, req, get_edge_properties);
if (!ret_in_opt) {
return std::nullopt;
}
auto ret_out_opt = GetEdgesWithProperties<AllEdgePropertiesVector, AllEdgeProperties, AllEdgePropertyDataSructure>(
out_edges, req, get_edge_properties);
if (!ret_out_opt) {
return std::nullopt;
}
auto &ret_in = *ret_in_opt;
auto &ret_out = *ret_out_opt;
SetFinalEdgeProperties<AllEdgePropertiesVector, SpecificEdgePropertiesVector>(
edges_with_all_properties, edges_with_specific_properties, ret_out, ret_in, req);
} else {
auto ret_in_opt =
GetEdgesWithProperties<SpecificEdgePropertiesVector, SpecificEdgeProperties, SpecificEdgePropertyDataSructure>(
in_edges, req, get_edge_properties);
if (!ret_in_opt) {
return std::nullopt;
}
auto ret_out_opt =
GetEdgesWithProperties<SpecificEdgePropertiesVector, SpecificEdgeProperties, SpecificEdgePropertyDataSructure>(
out_edges, req, get_edge_properties);
if (!ret_out_opt) {
return std::nullopt;
}
auto &ret_in = *ret_in_opt;
auto &ret_out = *ret_out_opt;
SetFinalEdgeProperties<SpecificEdgePropertiesVector, AllEdgePropertiesVector>(
edges_with_specific_properties, edges_with_all_properties, ret_out, ret_in, req);
}
return memgraph::msgs::ExpandOneResultRow{
.src_vertex = std::move(*source_vertex),
.src_vertex_properties = std::move(src_vertex_properties),
.edges_with_all_properties = std::move(edges_with_all_properties),
.edges_with_specific_properties = std::move(edges_with_specific_properties)};
}
} // namespace
namespace memgraph::storage::v3 {
@ -164,7 +454,56 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) {
}
}
return msgs::CreateVerticesResponse{action_successful};
return memgraph::msgs::CreateVerticesResponse{.success = action_successful};
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateVerticesRequest &&req) {
auto acc = shard_->Access(req.transaction_id);
bool action_successful = true;
for (auto &vertex : req.new_properties) {
if (!action_successful) {
break;
}
auto vertex_to_update = acc.FindVertex(ConvertPropertyVector(std::move(vertex.primary_key)), View::OLD);
if (!vertex_to_update) {
action_successful = false;
spdlog::debug("Vertex could not be found while trying to update its properties. Transaction id: {}",
req.transaction_id.logical_id);
continue;
}
for (auto &update_prop : vertex.property_updates) {
// TODO(gvolfing) Maybe check if the setting is valid if SetPropertyAndValidate()
// does not do that alreaedy.
auto result_schema =
vertex_to_update->SetPropertyAndValidate(update_prop.first, ToPropertyValue(std::move(update_prop.second)));
if (result_schema.HasError()) {
auto &error = result_schema.GetError();
std::visit(
[&action_successful]<typename T>(T &&) {
using ErrorType = std::remove_cvref_t<T>;
if constexpr (std::is_same_v<ErrorType, SchemaViolation>) {
action_successful = false;
spdlog::debug("Updating vertex failed with error: SchemaViolation");
} else if constexpr (std::is_same_v<ErrorType, Error>) {
action_successful = false;
spdlog::debug("Updating vertex failed with error: Error");
} else {
static_assert(kAlwaysFalse<T>, "Missing type from variant visitor");
}
},
error);
break;
}
}
}
return memgraph::msgs::UpdateVerticesResponse{.success = action_successful};
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteVerticesRequest &&req) {
@ -187,7 +526,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteVerticesRequest &&req) {
// Since we will not have different kinds of deletion types in one transaction,
// we dont have to enter the switch statement on every iteration. Optimize this.
switch (req.deletion_type) {
case msgs::DeleteVerticesRequest::DeletionType::DELETE: {
case memgraph::msgs::DeleteVerticesRequest::DeletionType::DELETE: {
auto result = acc.DeleteVertex(&vertex_acc.value());
if (result.HasError() || !(result.GetValue().has_value())) {
action_successful = false;
@ -196,7 +535,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteVerticesRequest &&req) {
break;
}
case msgs::DeleteVerticesRequest::DeletionType::DETACH_DELETE: {
case memgraph::msgs::DeleteVerticesRequest::DeletionType::DETACH_DELETE: {
auto result = acc.DetachDeleteVertex(&vertex_acc.value());
if (result.HasError() || !(result.GetValue().has_value())) {
action_successful = false;
@ -210,7 +549,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteVerticesRequest &&req) {
}
}
return msgs::DeleteVerticesResponse{action_successful};
return memgraph::msgs::DeleteVerticesResponse{.success = action_successful};
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateEdgesRequest &&req) {
@ -241,17 +580,110 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateEdgesRequest &&req) {
spdlog::debug("Creating edge was not successful. Transaction id: {}", req.transaction_id.logical_id);
break;
}
// Add properties to the edge if there is any
if (edge.properties) {
for (auto &[edge_prop_key, edge_prop_val] : edge.properties.value()) {
auto set_result = edge_acc->SetProperty(edge_prop_key, ToPropertyValue(std::move(edge_prop_val)));
if (set_result.HasError()) {
action_successful = false;
spdlog::debug("Adding property to edge was not successful. Transaction id: {}",
req.transaction_id.logical_id);
break;
}
}
}
}
return msgs::CreateEdgesResponse{action_successful};
return memgraph::msgs::CreateEdgesResponse{.success = action_successful};
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteEdgesRequest &&req) {
bool action_successful = true;
auto acc = shard_->Access(req.transaction_id);
for (auto &edge : req.edges) {
if (!action_successful) {
break;
}
auto edge_acc = acc.DeleteEdge(VertexId(edge.src.first.id, ConvertPropertyVector(std::move(edge.src.second))),
VertexId(edge.dst.first.id, ConvertPropertyVector(std::move(edge.dst.second))),
Gid::FromUint(edge.id.gid));
if (edge_acc.HasError() || !edge_acc.HasValue()) {
spdlog::debug("Error while trying to delete edge. Transaction id: {}", req.transaction_id.logical_id);
action_successful = false;
continue;
}
}
return memgraph::msgs::DeleteEdgesResponse{.success = action_successful};
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateEdgesRequest &&req) {
auto acc = shard_->Access(req.transaction_id);
bool action_successful = true;
for (auto &edge : req.new_properties) {
if (!action_successful) {
break;
}
auto vertex_acc = acc.FindVertex(ConvertPropertyVector(std::move(edge.src.second)), View::OLD);
if (!vertex_acc) {
action_successful = false;
spdlog::debug("Encountered an error while trying to acquire VertexAccessor with transaction id: {}",
req.transaction_id.logical_id);
continue;
}
// Since we are using the source vertex of the edge we are only intrested
// in the vertex's out-going edges
auto edges_res = vertex_acc->OutEdges(View::OLD);
if (edges_res.HasError()) {
action_successful = false;
spdlog::debug("Encountered an error while trying to acquire EdgeAccessor with transaction id: {}",
req.transaction_id.logical_id);
continue;
}
auto &edge_accessors = edges_res.GetValue();
// Look for the appropriate edge accessor
bool edge_accessor_did_match = false;
for (auto &edge_accessor : edge_accessors) {
if (edge_accessor.Gid().AsUint() == edge.edge_id.gid) { // Found the appropriate accessor
edge_accessor_did_match = true;
for (auto &[key, value] : edge.property_updates) {
// TODO(gvolfing)
// Check if the property was set if SetProperty does not do that itself.
auto res = edge_accessor.SetProperty(key, ToPropertyValue(std::move(value)));
if (res.HasError()) {
spdlog::debug("Encountered an error while trying to set the property of an Edge with transaction id: {}",
req.transaction_id.logical_id);
}
}
}
}
if (!edge_accessor_did_match) {
action_successful = false;
spdlog::debug("Could not find the Edge with the specified Gid. Transaction id: {}",
req.transaction_id.logical_id);
continue;
}
}
return memgraph::msgs::UpdateEdgesResponse{.success = action_successful};
}
msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
auto acc = shard_->Access(req.transaction_id);
bool action_successful = true;
std::vector<msgs::ScanResultRow> results;
std::optional<msgs::VertexId> next_start_id;
std::vector<memgraph::msgs::ScanResultRow> results;
std::optional<memgraph::msgs::VertexId> next_start_id;
const auto view = View(req.storage_view);
auto vertex_iterable = acc.Vertices(view);
@ -271,13 +703,17 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
std::optional<std::map<PropertyId, Value>> found_props;
if (req.props_to_return) {
found_props = CollectPropertiesFromAccessor(vertex, req.props_to_return.value(), view);
found_props = CollectSpecificPropertiesFromAccessor(vertex, req.props_to_return.value(), view);
} else {
found_props = CollectAllPropertiesFromAccessor(vertex, view);
}
// TODO(gvolfing) -VERIFY-
// Vertex is seperated from the properties in the response.
// Is it useful to return just a vertex without the properties?
if (!found_props) {
continue;
action_successful = false;
break;
}
results.emplace_back(msgs::ScanResultRow{.vertex = ConstructValueVertex(vertex, view).vertex_v,
@ -295,7 +731,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
}
}
msgs::ScanVerticesResponse resp{};
memgraph::msgs::ScanVerticesResponse resp{};
resp.success = action_successful;
if (action_successful) {
@ -306,24 +742,39 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
return resp;
}
msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
auto acc = shard_->Access(req.transaction_id);
bool action_successful = true;
std::vector<memgraph::msgs::ExpandOneResultRow> results;
for (auto &src_vertex : req.src_vertices) {
auto result = GetExpandOneResult(acc, src_vertex, req);
if (!result) {
action_successful = false;
break;
}
results.emplace_back(result.value());
}
memgraph::msgs::ExpandOneResponse resp{};
if (action_successful) {
resp.result = std::move(results);
}
return resp;
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CommitRequest &&req) {
shard_->Access(req.transaction_id).Commit(req.commit_timestamp);
return msgs::CommitResponse{true};
return memgraph::msgs::CommitResponse{true};
};
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateVerticesRequest && /*req*/) {
return msgs::UpdateVerticesResponse{};
}
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteEdgesRequest && /*req*/) { return msgs::DeleteEdgesResponse{}; }
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateEdgesRequest && /*req*/) { return msgs::UpdateEdgesResponse{}; }
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest && /*req*/) { return msgs::ExpandOneResponse{}; }
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest && /*req*/) {
return msgs::GetPropertiesResponse{};
return memgraph::msgs::GetPropertiesResponse{};
}
} // namespace memgraph::storage::v3

View File

@ -14,6 +14,7 @@
#include <memory>
#include <variant>
#include <openssl/ec.h>
#include "query/v2/requests.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/vertex_accessor.hpp"

View File

@ -10,6 +10,7 @@
// licenses/APL.txt.
#include <chrono>
#include <cstdint>
#include <iostream>
#include <optional>
#include <thread>
@ -23,6 +24,7 @@
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/shard_rsm.hpp"
@ -88,7 +90,7 @@ msgs::PrimaryKey GetPrimaryKey(int64_t value) {
return primary_key;
}
msgs::NewVertex get_new_vertex(int64_t value) {
msgs::NewVertex GetNewVertex(int64_t value) {
// Specify Labels.
msgs::Label label1 = {.id = LabelId::FromUint(1)};
std::vector<msgs::Label> label_ids = {label1};
@ -98,12 +100,16 @@ msgs::NewVertex get_new_vertex(int64_t value) {
// Specify properties
auto val1 = msgs::Value(static_cast<int64_t>(value));
auto prop1 = std::make_pair(PropertyId::FromUint(0), val1);
auto prop1 = std::make_pair(PropertyId::FromUint(1), val1);
auto val3 = msgs::Value(static_cast<int64_t>(value));
auto prop3 = std::make_pair(PropertyId::FromUint(2), val3);
//(VERIFY) does the schema has to be specified with the properties or the primarykey?
auto val2 = msgs::Value(static_cast<int64_t>(value));
auto prop2 = std::make_pair(PropertyId::FromUint(1), val1);
auto prop2 = std::make_pair(PropertyId::FromUint(0), val2);
std::vector<std::pair<PropertyId, msgs::Value>> properties{prop1, prop2};
std::vector<std::pair<PropertyId, msgs::Value>> properties{prop1, prop2, prop3};
// NewVertex
return {.label_ids = label_ids, .primary_key = primary_key, .properties = properties};
@ -115,13 +121,33 @@ std::vector<std::vector<msgs::Value>> GetValuePrimaryKeysWithValue(int64_t value
return {{val}};
}
void Commit(ShardClient &client, const coordinator::Hlc &transaction_timestamp) {
coordinator::Hlc commit_timestamp{.logical_id = GetTransactionId()};
msgs::CommitRequest commit_req{};
commit_req.transaction_id = transaction_timestamp;
commit_req.commit_timestamp = commit_timestamp;
while (true) {
auto write_res = client.SendWriteRequest(commit_req);
if (write_res.HasError()) {
continue;
}
auto write_response_result = write_res.GetValue();
auto write_response = std::get<msgs::CommitResponse>(write_response_result);
break;
}
}
} // namespace
// attempts to sending different requests
namespace {
bool AttemtpToCreateVertex(ShardClient &client, int64_t value) {
msgs::NewVertex vertex = get_new_vertex(value);
bool AttemptToCreateVertex(ShardClient &client, int64_t value) {
msgs::NewVertex vertex = GetNewVertex(value);
auto create_req = msgs::CreateVerticesRequest{};
create_req.new_vertices = {vertex};
@ -131,13 +157,58 @@ bool AttemtpToCreateVertex(ShardClient &client, int64_t value) {
MG_ASSERT(write_res.HasValue() && std::get<msgs::CreateVerticesResponse>(write_res.GetValue()).success,
"Unexpected failure");
auto commit_req = msgs::CommitRequest{create_req.transaction_id, msgs::Hlc{.logical_id = GetTransactionId()}};
auto commit_res = client.SendWriteRequest(commit_req);
MG_ASSERT(commit_res.HasValue() && std::get<msgs::CommitResponse>(commit_res.GetValue()).success,
"Unexpected failure");
Commit(client, create_req.transaction_id);
return true;
}
bool AttemptToDeleteVertex(ShardClient &client, int64_t value) {
auto delete_req = msgs::DeleteVerticesRequest{};
delete_req.deletion_type = msgs::DeleteVerticesRequest::DeletionType::DELETE;
delete_req.primary_keys = GetValuePrimaryKeysWithValue(value);
delete_req.transaction_id.logical_id = GetTransactionId();
while (true) {
auto write_res = client.SendWriteRequest(delete_req);
if (write_res.HasError()) {
continue;
}
auto write_response_result = write_res.GetValue();
auto write_response = std::get<msgs::DeleteVerticesResponse>(write_response_result);
Commit(client, delete_req.transaction_id);
return write_response.success;
}
}
bool AttemptToUpdateVertex(ShardClient &client, int64_t value) {
auto vertex_id = GetValuePrimaryKeysWithValue(value)[0];
std::vector<std::pair<PropertyId, msgs::Value>> property_updates;
auto property_update = std::make_pair(PropertyId::FromUint(2), msgs::Value(static_cast<int64_t>(10000)));
auto vertex_prop = msgs::UpdateVertexProp{};
vertex_prop.primary_key = vertex_id;
vertex_prop.property_updates = {property_update};
auto update_req = msgs::UpdateVerticesRequest{};
update_req.transaction_id.logical_id = GetTransactionId();
update_req.new_properties = {vertex_prop};
while (true) {
auto write_res = client.SendWriteRequest(update_req);
if (write_res.HasError()) {
continue;
}
auto write_response_result = write_res.GetValue();
auto write_response = std::get<msgs::UpdateVerticesResponse>(write_response_result);
Commit(client, update_req.transaction_id);
return write_response.success;
}
}
bool AttemptToAddEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t value_of_vertex_2, int64_t edge_gid,
int64_t edge_type_id) {
auto id = msgs::EdgeId{};
@ -155,6 +226,48 @@ bool AttemptToAddEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t va
edge.type = type;
edge.src = src;
edge.dst = dst;
edge.properties = std::nullopt;
msgs::CreateEdgesRequest create_req{};
create_req.edges = {edge};
create_req.transaction_id.logical_id = GetTransactionId();
while (true) {
auto write_res = client.SendWriteRequest(create_req);
if (write_res.HasError()) {
continue;
}
auto write_response_result = write_res.GetValue();
auto write_response = std::get<msgs::CreateEdgesResponse>(write_response_result);
Commit(client, create_req.transaction_id);
return write_response.success;
}
}
bool AttemptToAddEdgeWithProperties(ShardClient &client, int64_t value_of_vertex_1, int64_t value_of_vertex_2,
int64_t edge_gid, uint64_t edge_prop_id, int64_t edge_prop_val,
const std::vector<uint64_t> &edge_type_id) {
auto id1 = msgs::EdgeId{};
msgs::Label label = {.id = get_primary_label()};
auto src = std::make_pair(label, GetPrimaryKey(value_of_vertex_1));
auto dst = std::make_pair(label, GetPrimaryKey(value_of_vertex_2));
id1.gid = edge_gid;
auto type1 = msgs::EdgeType{};
type1.id = edge_type_id[0];
auto edge_prop = std::make_pair(PropertyId::FromUint(edge_prop_id), msgs::Value(edge_prop_val));
auto edge = msgs::Edge{};
edge.id = id1;
edge.type = type1;
edge.src = src;
edge.dst = dst;
edge.properties = {edge_prop};
msgs::CreateEdgesRequest create_req{};
create_req.edges = {edge};
@ -164,13 +277,112 @@ bool AttemptToAddEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t va
MG_ASSERT(write_res.HasValue() && std::get<msgs::CreateEdgesResponse>(write_res.GetValue()).success,
"Unexpected failure");
auto commit_req = msgs::CommitRequest{create_req.transaction_id, msgs::Hlc{.logical_id = GetTransactionId()}};
auto commit_res = client.SendWriteRequest(commit_req);
MG_ASSERT(commit_res.HasValue() && std::get<msgs::CommitResponse>(commit_res.GetValue()).success,
"Unexpected failure");
Commit(client, create_req.transaction_id);
return true;
}
bool AttemptToDeleteEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t value_of_vertex_2, int64_t edge_gid,
int64_t edge_type_id) {
auto id = msgs::EdgeId{};
msgs::Label label = {.id = get_primary_label()};
auto src = std::make_pair(label, GetPrimaryKey(value_of_vertex_1));
auto dst = std::make_pair(label, GetPrimaryKey(value_of_vertex_2));
id.gid = edge_gid;
auto type = msgs::EdgeType{};
type.id = edge_type_id;
auto edge = msgs::Edge{};
edge.id = id;
edge.type = type;
edge.src = {src};
edge.dst = {dst};
msgs::DeleteEdgesRequest delete_req{};
delete_req.edges = {edge};
delete_req.transaction_id.logical_id = GetTransactionId();
while (true) {
auto write_res = client.SendWriteRequest(delete_req);
if (write_res.HasError()) {
continue;
}
auto write_response_result = write_res.GetValue();
auto write_response = std::get<msgs::DeleteEdgesResponse>(write_response_result);
Commit(client, delete_req.transaction_id);
return write_response.success;
}
}
bool AttemptToUpdateEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t value_of_vertex_2, int64_t edge_gid,
int64_t edge_type_id, uint64_t edge_prop_id, int64_t edge_prop_val) {
auto id = msgs::EdgeId{};
msgs::Label label = {.id = get_primary_label()};
auto src = std::make_pair(label, GetPrimaryKey(value_of_vertex_1));
auto dst = std::make_pair(label, GetPrimaryKey(value_of_vertex_2));
id.gid = edge_gid;
auto type = msgs::EdgeType{};
type.id = edge_type_id;
auto edge = msgs::Edge{};
edge.id = id;
edge.type = type;
auto edge_prop = std::vector<std::pair<PropertyId, msgs::Value>>{
std::make_pair(PropertyId::FromUint(edge_prop_id), msgs::Value(edge_prop_val))};
msgs::UpdateEdgeProp update_props{.src = src, .dst = dst, .edge_id = id, .property_updates = edge_prop};
msgs::UpdateEdgesRequest update_req{};
update_req.transaction_id.logical_id = GetTransactionId();
update_req.new_properties = {update_props};
while (true) {
auto write_res = client.SendWriteRequest(update_req);
if (write_res.HasError()) {
continue;
}
auto write_response_result = write_res.GetValue();
auto write_response = std::get<msgs::UpdateEdgesResponse>(write_response_result);
Commit(client, update_req.transaction_id);
return write_response.success;
}
}
std::tuple<size_t, std::optional<msgs::VertexId>> AttemptToScanAllWithoutBatchLimit(ShardClient &client,
msgs::VertexId start_id) {
msgs::ScanVerticesRequest scan_req{};
scan_req.batch_limit = {};
scan_req.filter_expressions = std::nullopt;
scan_req.props_to_return = std::nullopt;
scan_req.start_id = start_id;
scan_req.storage_view = msgs::StorageView::OLD;
scan_req.transaction_id.logical_id = GetTransactionId();
while (true) {
auto read_res = client.SendReadRequest(scan_req);
if (read_res.HasError()) {
continue;
}
auto write_response_result = read_res.GetValue();
auto write_response = std::get<msgs::ScanVerticesResponse>(write_response_result);
MG_ASSERT(write_response.success);
return {write_response.results.size(), write_response.next_start_id};
}
}
std::tuple<size_t, std::optional<msgs::VertexId>> AttemptToScanAllWithBatchLimit(ShardClient &client,
msgs::VertexId start_id,
uint64_t batch_limit) {
@ -192,23 +404,258 @@ std::tuple<size_t, std::optional<msgs::VertexId>> AttemptToScanAllWithBatchLimit
auto write_response = std::get<msgs::ScanVerticesResponse>(write_response_result);
MG_ASSERT(write_response.success);
return {write_response.results.size(), write_response.next_start_id};
}
}
void AttemptToExpandOneWithWrongEdgeType(ShardClient &client, uint64_t src_vertex_val, uint64_t edge_type_id) {
// Source vertex
msgs::Label label = {.id = get_primary_label()};
auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val));
// Edge type
auto edge_type = msgs::EdgeType{};
edge_type.id = edge_type_id + 1;
// Edge direction
auto edge_direction = msgs::EdgeDirection::OUT;
// Source Vertex properties to look for
std::optional<std::vector<PropertyId>> src_vertex_properties = {};
// Edge properties to look for
std::optional<std::vector<PropertyId>> edge_properties = {};
std::vector<msgs::Expression> expressions;
std::optional<std::vector<msgs::OrderBy>> order_by = {};
std::optional<size_t> limit = {};
std::optional<msgs::Filter> filter = {};
msgs::ExpandOneRequest expand_one_req{};
expand_one_req.direction = edge_direction;
expand_one_req.edge_properties = edge_properties;
expand_one_req.edge_types = {edge_type};
expand_one_req.expressions = expressions;
expand_one_req.filter = filter;
expand_one_req.limit = limit;
expand_one_req.order_by = order_by;
expand_one_req.src_vertex_properties = src_vertex_properties;
expand_one_req.src_vertices = {src_vertex};
expand_one_req.transaction_id.logical_id = GetTransactionId();
while (true) {
auto read_res = client.SendReadRequest(expand_one_req);
if (read_res.HasError()) {
continue;
}
auto write_response_result = read_res.GetValue();
auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result);
MG_ASSERT(write_response.result.size() == 1);
MG_ASSERT(write_response.result[0].edges_with_all_properties);
MG_ASSERT(write_response.result[0].edges_with_all_properties->size() == 0);
MG_ASSERT(!write_response.result[0].edges_with_specific_properties);
break;
}
}
void AttemptToExpandOneSimple(ShardClient &client, uint64_t src_vertex_val, uint64_t edge_type_id) {
// Source vertex
msgs::Label label = {.id = get_primary_label()};
auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val));
// Edge type
auto edge_type = msgs::EdgeType{};
edge_type.id = edge_type_id;
// Edge direction
auto edge_direction = msgs::EdgeDirection::OUT;
// Source Vertex properties to look for
std::optional<std::vector<PropertyId>> src_vertex_properties = {};
// Edge properties to look for
std::optional<std::vector<PropertyId>> edge_properties = {};
std::vector<msgs::Expression> expressions;
std::optional<std::vector<msgs::OrderBy>> order_by = {};
std::optional<size_t> limit = {};
std::optional<msgs::Filter> filter = {};
msgs::ExpandOneRequest expand_one_req{};
expand_one_req.direction = edge_direction;
expand_one_req.edge_properties = edge_properties;
expand_one_req.edge_types = {edge_type};
expand_one_req.expressions = expressions;
expand_one_req.filter = filter;
expand_one_req.limit = limit;
expand_one_req.order_by = order_by;
expand_one_req.src_vertex_properties = src_vertex_properties;
expand_one_req.src_vertices = {src_vertex};
expand_one_req.transaction_id.logical_id = GetTransactionId();
while (true) {
auto read_res = client.SendReadRequest(expand_one_req);
if (read_res.HasError()) {
continue;
}
auto write_response_result = read_res.GetValue();
auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result);
MG_ASSERT(write_response.result.size() == 1);
MG_ASSERT(write_response.result[0].edges_with_all_properties->size() == 2);
auto number_of_properties_on_edge =
(std::get<std::map<PropertyId, msgs::Value>>(write_response.result[0].edges_with_all_properties.value()[0]))
.size();
MG_ASSERT(number_of_properties_on_edge == 1);
break;
}
}
void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uint64_t src_vertex_val,
uint64_t edge_type_id) {
// Source vertex
msgs::Label label = {.id = get_primary_label()};
auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val));
// Edge type
auto edge_type = msgs::EdgeType{};
edge_type.id = edge_type_id;
// Edge direction
auto edge_direction = msgs::EdgeDirection::OUT;
// Source Vertex properties to look for
std::vector<PropertyId> desired_src_vertex_props{PropertyId::FromUint(2)};
std::optional<std::vector<PropertyId>> src_vertex_properties = desired_src_vertex_props;
// Edge properties to look for
std::optional<std::vector<PropertyId>> edge_properties = {};
std::vector<msgs::Expression> expressions;
std::optional<std::vector<msgs::OrderBy>> order_by = {};
std::optional<size_t> limit = {};
std::optional<msgs::Filter> filter = {};
msgs::ExpandOneRequest expand_one_req{};
expand_one_req.direction = edge_direction;
expand_one_req.edge_properties = edge_properties;
expand_one_req.edge_types = {edge_type};
expand_one_req.expressions = expressions;
expand_one_req.filter = filter;
expand_one_req.limit = limit;
expand_one_req.order_by = order_by;
expand_one_req.src_vertex_properties = src_vertex_properties;
expand_one_req.src_vertices = {src_vertex};
expand_one_req.transaction_id.logical_id = GetTransactionId();
while (true) {
auto read_res = client.SendReadRequest(expand_one_req);
if (read_res.HasError()) {
continue;
}
auto write_response_result = read_res.GetValue();
auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result);
MG_ASSERT(write_response.result.size() == 1);
auto src_vertex_props_size = write_response.result[0].src_vertex_properties->size();
MG_ASSERT(src_vertex_props_size == 1);
MG_ASSERT(write_response.result[0].edges_with_all_properties->size() == 2);
auto number_of_properties_on_edge =
(std::get<std::map<PropertyId, msgs::Value>>(write_response.result[0].edges_with_all_properties.value()[0]))
.size();
MG_ASSERT(number_of_properties_on_edge == 1);
break;
}
}
void AttemptToExpandOneWithSpecifiedEdgeProperties(ShardClient &client, uint64_t src_vertex_val, uint64_t edge_type_id,
uint64_t edge_prop_id) {
// Source vertex
msgs::Label label = {.id = get_primary_label()};
auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val));
// Edge type
auto edge_type = msgs::EdgeType{};
edge_type.id = edge_type_id;
// Edge direction
auto edge_direction = msgs::EdgeDirection::OUT;
// Source Vertex properties to look for
std::optional<std::vector<PropertyId>> src_vertex_properties = {};
// Edge properties to look for
std::vector<PropertyId> specified_edge_prop{PropertyId::FromUint(edge_prop_id)};
std::optional<std::vector<PropertyId>> edge_properties = {specified_edge_prop};
std::vector<msgs::Expression> expressions;
std::optional<std::vector<msgs::OrderBy>> order_by = {};
std::optional<size_t> limit = {};
std::optional<msgs::Filter> filter = {};
msgs::ExpandOneRequest expand_one_req{};
expand_one_req.direction = edge_direction;
expand_one_req.edge_properties = edge_properties;
expand_one_req.edge_types = {edge_type};
expand_one_req.expressions = expressions;
expand_one_req.filter = filter;
expand_one_req.limit = limit;
expand_one_req.order_by = order_by;
expand_one_req.src_vertex_properties = src_vertex_properties;
expand_one_req.src_vertices = {src_vertex};
expand_one_req.transaction_id.logical_id = GetTransactionId();
while (true) {
auto read_res = client.SendReadRequest(expand_one_req);
if (read_res.HasError()) {
continue;
}
auto write_response_result = read_res.GetValue();
auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result);
MG_ASSERT(write_response.result.size() == 1);
auto specific_properties_size =
(std::get<std::vector<msgs::Value>>(write_response.result[0].edges_with_specific_properties.value()[0]));
MG_ASSERT(specific_properties_size.size() == 1);
break;
}
}
} // namespace
// tests
namespace {
void TestCreateVertices(ShardClient &client) { MG_ASSERT(AttemtpToCreateVertex(client, GetUniqueInteger())); }
void TestCreateVertices(ShardClient &client) { MG_ASSERT(AttemptToCreateVertex(client, GetUniqueInteger())); }
void TestAddEdge(ShardClient &client) {
void TestCreateAndDeleteVertices(ShardClient &client) {
auto unique_prop_val = GetUniqueInteger();
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val));
MG_ASSERT(AttemptToDeleteVertex(client, unique_prop_val));
}
void TestCreateAndUpdateVertices(ShardClient &client) {
auto unique_prop_val = GetUniqueInteger();
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val));
MG_ASSERT(AttemptToUpdateVertex(client, unique_prop_val));
}
void TestCreateEdge(ShardClient &client) {
auto unique_prop_val_1 = GetUniqueInteger();
auto unique_prop_val_2 = GetUniqueInteger();
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_1));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_2));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_1));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2));
auto edge_gid = GetUniqueInteger();
auto edge_type_id = GetUniqueInteger();
@ -216,6 +663,46 @@ void TestAddEdge(ShardClient &client) {
MG_ASSERT(AttemptToAddEdge(client, unique_prop_val_1, unique_prop_val_2, edge_gid, edge_type_id));
}
void TestCreateAndDeleteEdge(ShardClient &client) {
// Add the Edge
auto unique_prop_val_1 = GetUniqueInteger();
auto unique_prop_val_2 = GetUniqueInteger();
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_1));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2));
auto edge_gid = GetUniqueInteger();
auto edge_type_id = GetUniqueInteger();
MG_ASSERT(AttemptToAddEdge(client, unique_prop_val_1, unique_prop_val_2, edge_gid, edge_type_id));
// Delete the Edge
MG_ASSERT(AttemptToDeleteEdge(client, unique_prop_val_1, unique_prop_val_2, edge_gid, edge_type_id));
}
void TestUpdateEdge(ShardClient &client) {
// Add the Edge
auto unique_prop_val_1 = GetUniqueInteger();
auto unique_prop_val_2 = GetUniqueInteger();
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_1));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2));
auto edge_gid = GetUniqueInteger();
auto edge_type_id = GetUniqueInteger();
auto edge_prop_id = GetUniqueInteger();
auto edge_prop_val_old = GetUniqueInteger();
auto edge_prop_val_new = GetUniqueInteger();
MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_2, edge_gid, edge_prop_id,
edge_prop_val_old, {edge_type_id}));
// Update the Edge
MG_ASSERT(AttemptToUpdateEdge(client, unique_prop_val_1, unique_prop_val_2, edge_gid, edge_type_id, edge_prop_id,
edge_prop_val_new));
}
void TestScanAllOneGo(ShardClient &client) {
auto unique_prop_val_1 = GetUniqueInteger();
auto unique_prop_val_2 = GetUniqueInteger();
@ -223,19 +710,22 @@ void TestScanAllOneGo(ShardClient &client) {
auto unique_prop_val_4 = GetUniqueInteger();
auto unique_prop_val_5 = GetUniqueInteger();
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_1));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_2));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_3));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_4));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_5));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_1));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_3));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_4));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_5));
msgs::Label prim_label = {.id = get_primary_label()};
msgs::PrimaryKey prim_key = {msgs::Value(static_cast<int64_t>(unique_prop_val_1))};
msgs::VertexId v_id = {prim_label, prim_key};
auto [result_size, next_id] = AttemptToScanAllWithBatchLimit(client, v_id, 5);
MG_ASSERT(result_size == 5);
auto [result_size_with_batch, next_id_with_batch] = AttemptToScanAllWithBatchLimit(client, v_id, 5);
auto [result_size_without_batch, next_id_without_batch] = AttemptToScanAllWithoutBatchLimit(client, v_id);
MG_ASSERT(result_size_with_batch == 5);
MG_ASSERT(result_size_without_batch == 5);
}
void TestScanAllWithSmallBatchSize(ShardClient &client) {
@ -250,16 +740,16 @@ void TestScanAllWithSmallBatchSize(ShardClient &client) {
auto unique_prop_val_9 = GetUniqueInteger();
auto unique_prop_val_10 = GetUniqueInteger();
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_1));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_2));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_3));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_4));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_5));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_6));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_7));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_8));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_9));
MG_ASSERT(AttemtpToCreateVertex(client, unique_prop_val_10));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_1));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_3));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_4));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_5));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_6));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_7));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_8));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_9));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_10));
msgs::Label prim_label = {.id = get_primary_label()};
msgs::PrimaryKey prim_key1 = {msgs::Value(static_cast<int64_t>(unique_prop_val_1))};
@ -280,6 +770,39 @@ void TestScanAllWithSmallBatchSize(ShardClient &client) {
MG_ASSERT(!next_id4);
}
void TestExpandOne(ShardClient &client) {
{
// ExpandOneSimple
auto unique_prop_val_1 = GetUniqueInteger();
auto unique_prop_val_2 = GetUniqueInteger();
auto unique_prop_val_3 = GetUniqueInteger();
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_1));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2));
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_3));
auto edge_type_id = GetUniqueInteger();
auto edge_gid_1 = GetUniqueInteger();
auto edge_gid_2 = GetUniqueInteger();
auto edge_prop_id = GetUniqueInteger();
auto edge_prop_val = GetUniqueInteger();
// (V1)-[edge_type_id]->(V2)
MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_2, edge_gid_1, edge_prop_id,
edge_prop_val, {edge_type_id}));
// (V1)-[edge_type_id]->(V3)
MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_3, edge_gid_2, edge_prop_id,
edge_prop_val, {edge_type_id}));
AttemptToExpandOneSimple(client, unique_prop_val_1, edge_type_id);
AttemptToExpandOneWithWrongEdgeType(client, unique_prop_val_1, edge_type_id);
AttemptToExpandOneWithSpecifiedSrcVertexProperties(client, unique_prop_val_1, edge_type_id);
AttemptToExpandOneWithSpecifiedEdgeProperties(client, unique_prop_val_1, edge_type_id, edge_prop_id);
}
}
} // namespace
int TestMessages() {
@ -312,13 +835,14 @@ int TestMessages() {
PropertyValue max_pk(static_cast<int64_t>(10000000));
std::vector<PropertyValue> max_prim_key = {max_pk};
auto shard_ptr1 = std::make_unique<Shard>(get_primary_label(), min_prim_key, max_prim_key);
auto shard_ptr2 = std::make_unique<Shard>(get_primary_label(), min_prim_key, max_prim_key);
auto shard_ptr3 = std::make_unique<Shard>(get_primary_label(), min_prim_key, max_prim_key);
std::vector<SchemaProperty> schema = {get_schema_property()};
auto shard_ptr1 = std::make_unique<Shard>(get_primary_label(), min_prim_key, max_prim_key, schema);
auto shard_ptr2 = std::make_unique<Shard>(get_primary_label(), min_prim_key, max_prim_key, schema);
auto shard_ptr3 = std::make_unique<Shard>(get_primary_label(), min_prim_key, max_prim_key, schema);
shard_ptr1->CreateSchema(get_primary_label(), {get_schema_property()});
shard_ptr2->CreateSchema(get_primary_label(), {get_schema_property()});
shard_ptr3->CreateSchema(get_primary_label(), {get_schema_property()});
shard_ptr1->CreateSchema(get_primary_label(), schema);
shard_ptr2->CreateSchema(get_primary_label(), schema);
shard_ptr3->CreateSchema(get_primary_label(), schema);
std::vector<Address> address_for_1{shard_server_2_address, shard_server_3_address};
std::vector<Address> address_for_2{shard_server_1_address, shard_server_3_address};
@ -341,11 +865,23 @@ int TestMessages() {
std::vector server_addrs = {shard_server_1_address, shard_server_2_address, shard_server_3_address};
ShardClient client(shard_client_io, shard_server_1_address, server_addrs);
// Vertex tests
TestCreateVertices(client);
TestAddEdge(client);
TestCreateAndDeleteVertices(client);
TestCreateAndUpdateVertices(client);
// Edge tests
TestCreateEdge(client);
TestCreateAndDeleteEdge(client);
TestUpdateEdge(client);
// ScanAll tests
TestScanAllOneGo(client);
TestScanAllWithSmallBatchSize(client);
// ExpandOne tests
TestExpandOne(client);
simulator.ShutDown();
SimulatorStats stats = simulator.Stats();

View File

@ -37,11 +37,7 @@ namespace memgraph::storage::v3::tests {
class StorageV3 : public ::testing::TestWithParam<bool> {
protected:
void SetUp() override {
store.StoreMapping({{1, "label"}, {2, "property"}});
ASSERT_TRUE(
store.CreateSchema(primary_label, {storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}}));
}
void SetUp() override { store.StoreMapping({{1, "label"}, {2, "property"}}); }
void TearDown() override { CleanupHlc(last_hlc); }
@ -75,10 +71,14 @@ class StorageV3 : public ::testing::TestWithParam<bool> {
static constexpr std::chrono::seconds wall_clock_increment{10};
static constexpr std::chrono::seconds reclamation_interval{wall_clock_increment / 2};
static constexpr io::Duration one_time_unit{1};
const std::vector<PropertyValue> min_pk{PropertyValue{0}};
const std::vector<PropertyValue> pk{PropertyValue{0}};
const LabelId primary_label{LabelId::FromUint(1)};
const PropertyId primary_property{PropertyId::FromUint(2)};
Shard store{primary_label, pk, std::nullopt, Config{.gc = {.reclamation_interval = reclamation_interval}}};
std::vector<storage::v3::SchemaProperty> schema_property_vector = {
storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}};
Shard store{primary_label, min_pk, std::nullopt /*max_primary_key*/, schema_property_vector,
Config{.gc = {.reclamation_interval = reclamation_interval}}};
coordinator::Hlc last_hlc{0, io::Time{}};
};
INSTANTIATE_TEST_CASE_P(WithGc, StorageV3, ::testing::Values(true));
@ -2664,9 +2664,7 @@ TEST_P(StorageV3, TestCreateVertexAndValidate) {
{
ASSERT_DEATH(
{
Shard store(primary_label, pk, std::nullopt);
ASSERT_TRUE(store.CreateSchema(primary_label,
{storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}}));
Shard store(primary_label, min_pk, std::nullopt /*max_primary_key*/, schema_property_vector);
auto acc = store.Access(GetNextHlc());
auto vertex1 = acc.CreateVertexAndValidate(primary_label, {}, {{primary_property, PropertyValue(0)}});
auto vertex2 = acc.CreateVertexAndValidate(primary_label, {}, {{primary_property, PropertyValue(0)}});

View File

@ -26,8 +26,6 @@ class StorageEdgeTest : public ::testing::TestWithParam<bool> {
protected:
void SetUp() override {
store.StoreMapping({{1, "label"}, {2, "property"}, {3, "et5"}, {4, "other"}, {5, "different_label"}});
ASSERT_TRUE(
store.CreateSchema(primary_label, {storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}}));
}
[[nodiscard]] LabelId NameToLabelId(std::string_view label_name) { return store.NameToLabel(label_name); }
@ -50,14 +48,15 @@ class StorageEdgeTest : public ::testing::TestWithParam<bool> {
return last_hlc;
}
static constexpr int64_t min_primary_key_value{0};
static constexpr int64_t max_primary_key_value{10000};
const std::vector<PropertyValue> min_pk{PropertyValue{0}};
const std::vector<PropertyValue> max_pk{PropertyValue{10000}};
const LabelId primary_label{LabelId::FromUint(1)};
Shard store{primary_label,
{PropertyValue{min_primary_key_value}},
std::vector{PropertyValue{max_primary_key_value}},
Config{.items = {.properties_on_edges = GetParam()}}};
const PropertyId primary_property{PropertyId::FromUint(2)};
std::vector<storage::v3::SchemaProperty> schema_property_vector = {
storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}};
Shard store{primary_label, min_pk, max_pk, schema_property_vector,
Config{.items = {.properties_on_edges = GetParam()}}};
coordinator::Hlc last_hlc{0, io::Time{}};
};
@ -1406,7 +1405,7 @@ TEST_P(StorageEdgeTest, EdgeCreateFromLargerAbort) {
TEST_P(StorageEdgeTest, EdgeDeleteFromSmallerCommit) {
// Create vertex
const PropertyValue from_key{0};
const PropertyValue to_key{max_primary_key_value};
const PropertyValue to_key{max_pk};
const PropertyValue non_existing_key{2};
auto acc = store.Access(GetNextHlc());
const auto from_id = std::invoke(
@ -2510,7 +2509,7 @@ TEST_P(StorageEdgeTest, EdgeDeleteFromSmallerCommit) {
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_P(StorageEdgeTest, EdgeDeleteFromLargerAbort) {
// Create vertex
const PropertyValue from_key{max_primary_key_value};
const PropertyValue from_key{max_pk};
const PropertyValue to_key{0};
const PropertyValue non_existing_key{2};
auto acc = store.Access(GetNextHlc());

View File

@ -134,7 +134,10 @@ class ExpressionEvaluatorTest : public ::testing::Test {
PropertyId primary_property{PropertyId::FromInt(2)};
PrimaryKey min_pk{PropertyValue(0)};
Shard db{primary_label, min_pk, std::nullopt};
std::vector<storage::v3::SchemaProperty> schema_property_vector = {
storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}};
Shard db{primary_label, min_pk, std::nullopt /*max_primary_key*/, schema_property_vector};
Shard::Accessor storage_dba{db.Access(GetNextHlc())};
DbAccessor dba{&storage_dba};
@ -148,11 +151,7 @@ class ExpressionEvaluatorTest : public ::testing::Test {
coordinator::Hlc last_hlc{0, io::Time{}};
void SetUp() override {
db.StoreMapping({{1, "label"}, {2, "property"}});
ASSERT_TRUE(
db.CreateSchema(primary_label, {storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}}));
}
void SetUp() override { db.StoreMapping({{1, "label"}, {2, "property"}}); }
std::vector<PropertyId> NamesToProperties(const std::vector<std::string> &property_names) {
std::vector<PropertyId> properties;
@ -1108,8 +1107,6 @@ class ExpressionEvaluatorPropertyLookup : public ExpressionEvaluatorTest {
void SetUp() override {
identifier->MapTo(symbol);
db.StoreMapping({{1, "label"}, {2, "property"}, {3, "age"}, {4, "height"}});
ASSERT_TRUE(
db.CreateSchema(primary_label, {storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}}));
}
auto Value(std::pair<std::string, PropertyId> property) {

View File

@ -37,14 +37,14 @@ class IndexTest : public testing::Test {
protected:
void SetUp() override {
storage.StoreMapping({{1, "label"}, {2, "property"}, {3, "label1"}, {4, "label2"}, {5, "id"}, {6, "val"}});
ASSERT_TRUE(
storage.CreateSchema(primary_label, {storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}}));
}
const std::vector<PropertyValue> pk{PropertyValue{0}};
const LabelId primary_label{LabelId::FromUint(1)};
Shard storage{primary_label, pk, std::nullopt};
const PropertyId primary_property{PropertyId::FromUint(2)};
std::vector<storage::v3::SchemaProperty> schema_property_vector = {
storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}};
const std::vector<PropertyValue> min_pk{PropertyValue{0}};
const LabelId primary_label{LabelId::FromUint(1)};
Shard storage{primary_label, min_pk, std::nullopt /*max_primary_key*/, schema_property_vector};
const PropertyId prop_id{PropertyId::FromUint(5)};
const PropertyId prop_val{PropertyId::FromUint(6)};

View File

@ -60,10 +60,13 @@ class StorageIsolationLevelTest : public ::testing::TestWithParam<IsolationLevel
}
NameIdMapper id_mapper;
static constexpr int64_t min_primary_key_value{0};
static constexpr int64_t max_primary_key_value{10000};
const LabelId primary_label{NameToLabelId("label")};
const std::vector<PropertyValue> min_pk{PropertyValue{0}};
const std::vector<PropertyValue> max_pk{PropertyValue{10000}};
const PropertyId primary_property{NameToPropertyId("property")};
std::vector<storage::v3::SchemaProperty> schema_property_vector = {
storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}};
const LabelId primary_label{NameToLabelId("label")};
coordinator::Hlc last_hlc{0, io::Time{}};
public:
@ -79,12 +82,8 @@ TEST_P(StorageIsolationLevelTest, Visibility) {
for (auto override_isolation_level_index{0U}; override_isolation_level_index < isolation_levels.size();
++override_isolation_level_index) {
Shard store{primary_label,
{PropertyValue{min_primary_key_value}},
std::vector{PropertyValue{max_primary_key_value}},
Shard store{primary_label, min_pk, max_pk, schema_property_vector,
Config{.transaction = {.isolation_level = default_isolation_level}}};
ASSERT_TRUE(
store.CreateSchema(primary_label, {storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}}));
const auto override_isolation_level = isolation_levels[override_isolation_level_index];
auto creator = store.Access(GetNextHlc());
auto default_isolation_level_reader = store.Access(GetNextHlc());

View File

@ -10,6 +10,7 @@
// licenses/APL.txt.
#include <limits>
#include <optional>
#include <variant>
#include <gmock/gmock.h>
@ -31,10 +32,7 @@ namespace memgraph::storage::v3::tests {
class StorageV3Accessor : public ::testing::Test {
protected:
void SetUp() override {
storage.StoreMapping({{1, "label"}, {2, "property"}});
ASSERT_TRUE(storage.CreateSchema(primary_label, {SchemaProperty{primary_property, common::SchemaType::INT}}));
}
void SetUp() override { storage.StoreMapping({{1, "label"}, {2, "property"}}); }
VertexAccessor CreateVertexAndValidate(Shard::Accessor &acc, LabelId primary_label,
const std::vector<LabelId> &labels,
@ -54,10 +52,13 @@ class StorageV3Accessor : public ::testing::Test {
return last_hlc;
}
const std::vector<PropertyValue> pk{PropertyValue{0}};
const std::vector<PropertyValue> min_pk{PropertyValue{0}};
const LabelId primary_label{LabelId::FromUint(1)};
const PropertyId primary_property{PropertyId::FromUint(2)};
Shard storage{primary_label, pk, std::nullopt};
std::vector<storage::v3::SchemaProperty> schema_property_vector = {
storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}};
Shard storage{primary_label, min_pk, std::nullopt /*max_primary_key*/, schema_property_vector};
coordinator::Hlc last_hlc{0, io::Time{}};
};