Merge branch 'project-pineapples' into T0995-MG-implement-top-error-handling-storage
This commit is contained in:
commit
b3eec92525
@ -36,7 +36,7 @@
|
||||
// This cannot be avoided by simple include orderings so we
|
||||
// simply undefine those macros as we're sure that libkrb5
|
||||
// won't and can't be used anywhere in the query engine.
|
||||
#include "storage/v3/storage.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
|
@ -47,7 +47,6 @@
|
||||
#include "query/v2/shard_request_manager.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "storage/v3/storage.hpp"
|
||||
#include "utils/algorithm.hpp"
|
||||
#include "utils/csv_parsing.hpp"
|
||||
#include "utils/event_counter.hpp"
|
||||
|
@ -456,10 +456,12 @@ struct ExpandOneResponse {
|
||||
std::vector<ExpandOneResultRow> result;
|
||||
};
|
||||
|
||||
struct UpdateVertexProp {
|
||||
struct UpdateVertex {
|
||||
PrimaryKey primary_key;
|
||||
// This should be a map
|
||||
std::vector<std::pair<PropertyId, Value>> property_updates;
|
||||
// Labels are first added and then removed from vertices
|
||||
std::vector<LabelId> add_labels;
|
||||
std::vector<LabelId> remove_labels;
|
||||
std::map<PropertyId, Value> property_updates;
|
||||
};
|
||||
|
||||
struct UpdateEdgeProp {
|
||||
@ -502,7 +504,7 @@ struct DeleteVerticesResponse {
|
||||
|
||||
struct UpdateVerticesRequest {
|
||||
Hlc transaction_id;
|
||||
std::vector<UpdateVertexProp> new_properties;
|
||||
std::vector<UpdateVertex> update_vertices;
|
||||
};
|
||||
|
||||
struct UpdateVerticesResponse {
|
||||
|
@ -16,12 +16,10 @@ set(storage_v3_src_files
|
||||
schemas.cpp
|
||||
schema_validator.cpp
|
||||
shard.cpp
|
||||
storage.cpp
|
||||
shard_rsm.cpp
|
||||
bindings/typed_value.cpp
|
||||
expr.cpp
|
||||
request_helper.cpp
|
||||
storage.cpp)
|
||||
request_helper.cpp)
|
||||
|
||||
# ######################
|
||||
find_package(gflags REQUIRED)
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include "storage/v3/bindings/typed_value.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "storage/v3/vertex_accessor.hpp"
|
||||
#include "utils/template_utils.hpp"
|
||||
|
||||
namespace memgraph::storage::v3 {
|
||||
|
||||
@ -113,4 +114,5 @@ VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_itera
|
||||
std::vector<Element>::const_iterator GetStartOrderedElementsIterator(const std::vector<Element> &ordered_elements,
|
||||
const std::vector<PropertyValue> &start_ids,
|
||||
View view);
|
||||
|
||||
} // namespace memgraph::storage::v3
|
||||
|
@ -38,7 +38,6 @@
|
||||
#include "storage/v3/schemas.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "storage/v3/shard_rsm.hpp"
|
||||
#include "storage/v3/storage.hpp"
|
||||
#include "storage/v3/value_conversions.hpp"
|
||||
#include "storage/v3/vertex_accessor.hpp"
|
||||
#include "storage/v3/vertex_id.hpp"
|
||||
@ -58,11 +57,11 @@ using conversions::ToPropertyValue;
|
||||
namespace {
|
||||
namespace msgs = msgs;
|
||||
|
||||
using AllEdgePropertyDataSructure = std::map<PropertyId, msgs::Value>;
|
||||
using SpecificEdgePropertyDataSructure = std::vector<msgs::Value>;
|
||||
using AllEdgePropertyDataStructure = std::map<PropertyId, msgs::Value>;
|
||||
using SpecificEdgePropertyDataStructure = std::vector<msgs::Value>;
|
||||
|
||||
using AllEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, AllEdgePropertyDataSructure>;
|
||||
using SpecificEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, SpecificEdgePropertyDataSructure>;
|
||||
using AllEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, AllEdgePropertyDataStructure>;
|
||||
using SpecificEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, SpecificEdgePropertyDataStructure>;
|
||||
|
||||
using SpecificEdgePropertiesVector = std::vector<SpecificEdgeProperties>;
|
||||
using AllEdgePropertiesVector = std::vector<AllEdgeProperties>;
|
||||
@ -71,7 +70,7 @@ using EdgeAccessors = std::vector<storage::v3::EdgeAccessor>;
|
||||
|
||||
using EdgeFiller =
|
||||
std::function<ShardResult<void>(const EdgeAccessor &edge, bool is_in_edge, msgs::ExpandOneResultRow &result_row)>;
|
||||
using EdgeUniqunessFunction = std::function<EdgeAccessors(EdgeAccessors &&, msgs::EdgeDirection)>;
|
||||
using EdgeUniquenessFunction = std::function<EdgeAccessors(EdgeAccessors &&, msgs::EdgeDirection)>;
|
||||
|
||||
struct VertexIdCmpr {
|
||||
bool operator()(const storage::v3::VertexId *lhs, const storage::v3::VertexId *rhs) const { return *lhs < *rhs; }
|
||||
@ -182,8 +181,6 @@ std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexA
|
||||
return evaluated_expressions;
|
||||
}
|
||||
|
||||
struct LocalError {};
|
||||
|
||||
ShardResult<std::vector<msgs::Label>> FillUpSourceVertexSecondaryLabels(const std::optional<VertexAccessor> &v_acc,
|
||||
const msgs::ExpandOneRequest &req) {
|
||||
auto secondary_labels = v_acc->Labels(View::NEW);
|
||||
@ -244,7 +241,7 @@ ShardResult<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std:
|
||||
|
||||
ShardResult<std::array<std::vector<EdgeAccessor>, 2>> FillUpConnectingEdges(
|
||||
const std::optional<VertexAccessor> &v_acc, const msgs::ExpandOneRequest &req,
|
||||
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness) {
|
||||
const EdgeUniquenessFunction &maybe_filter_based_on_edge_uniquness) {
|
||||
std::vector<EdgeTypeId> edge_types{};
|
||||
edge_types.reserve(req.edge_types.size());
|
||||
std::transform(req.edge_types.begin(), req.edge_types.end(), std::back_inserter(edge_types),
|
||||
@ -298,18 +295,15 @@ ShardResult<std::array<std::vector<EdgeAccessor>, 2>> FillUpConnectingEdges(
|
||||
return std::array<std::vector<EdgeAccessor>, 2>{in_edges, out_edges};
|
||||
}
|
||||
|
||||
using AllEdgePropertyDataSructure = std::map<PropertyId, msgs::Value>;
|
||||
using SpecificEdgePropertyDataSructure = std::vector<msgs::Value>;
|
||||
using AllEdgePropertyDataStructure = std::map<PropertyId, msgs::Value>;
|
||||
using SpecificEdgePropertyDataStructure = std::vector<msgs::Value>;
|
||||
|
||||
using AllEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, AllEdgePropertyDataSructure>;
|
||||
using SpecificEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, SpecificEdgePropertyDataSructure>;
|
||||
using AllEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, AllEdgePropertyDataStructure>;
|
||||
using SpecificEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, SpecificEdgePropertyDataStructure>;
|
||||
|
||||
using SpecificEdgePropertiesVector = std::vector<SpecificEdgeProperties>;
|
||||
using AllEdgePropertiesVector = std::vector<AllEdgeProperties>;
|
||||
|
||||
using EdgeFiller =
|
||||
std::function<ShardResult<void>(const EdgeAccessor &edge, bool is_in_edge, msgs::ExpandOneResultRow &result_row)>;
|
||||
|
||||
template <bool are_in_edges>
|
||||
ShardResult<void> FillEdges(const std::vector<EdgeAccessor> &edges, msgs::ExpandOneResultRow &row,
|
||||
const EdgeFiller &edge_filler) {
|
||||
@ -323,7 +317,7 @@ ShardResult<void> FillEdges(const std::vector<EdgeAccessor> &edges, msgs::Expand
|
||||
|
||||
ShardResult<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req,
|
||||
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler,
|
||||
const EdgeUniquenessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler,
|
||||
const Schemas::Schema *schema) {
|
||||
/// Fill up source vertex
|
||||
const auto primary_key = ConvertPropertyVector(src_vertex.second);
|
||||
@ -365,9 +359,9 @@ ShardResult<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
return result_row;
|
||||
}
|
||||
|
||||
EdgeUniqunessFunction InitializeEdgeUniqunessFunction(bool only_unique_neighbor_rows) {
|
||||
EdgeUniquenessFunction InitializeEdgeUniquenessFunction(bool only_unique_neighbor_rows) {
|
||||
// Functions to select connecting edges based on uniquness
|
||||
EdgeUniqunessFunction maybe_filter_based_on_edge_uniquness;
|
||||
EdgeUniquenessFunction maybe_filter_based_on_edge_uniquness;
|
||||
|
||||
if (only_unique_neighbor_rows) {
|
||||
maybe_filter_based_on_edge_uniquness = [](EdgeAccessors &&edges,
|
||||
@ -514,25 +508,33 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateVerticesRequest &&req) {
|
||||
auto acc = shard_->Access(req.transaction_id);
|
||||
|
||||
std::optional<msgs::ShardError> shard_error;
|
||||
|
||||
for (auto &vertex : req.new_properties) {
|
||||
if (shard_error) {
|
||||
break;
|
||||
}
|
||||
|
||||
for (auto &vertex : req.update_vertices) {
|
||||
auto vertex_to_update = acc.FindVertex(ConvertPropertyVector(std::move(vertex.primary_key)), View::OLD);
|
||||
if (!vertex_to_update) {
|
||||
shard_error.emplace(msgs::ShardError{common::ErrorCode::OBJECT_NOT_FOUND});
|
||||
spdlog::debug("In transaction {} vertex could not be found while trying to update its properties.",
|
||||
req.transaction_id.logical_id);
|
||||
continue;
|
||||
break;
|
||||
}
|
||||
|
||||
for (const auto label : vertex.add_labels) {
|
||||
if (const auto maybe_error = vertex_to_update->AddLabelAndValidate(label); maybe_error.HasError()) {
|
||||
shard_error.emplace(CreateErrorResponse(maybe_error.GetError(), req.transaction_id, "adding label"));
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (const auto label : vertex.remove_labels) {
|
||||
if (const auto maybe_error = vertex_to_update->RemoveLabelAndValidate(label); maybe_error.HasError()) {
|
||||
shard_error.emplace(CreateErrorResponse(maybe_error.GetError(), req.transaction_id, "adding label"));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (auto &update_prop : vertex.property_updates) {
|
||||
auto result_schema =
|
||||
vertex_to_update->SetPropertyAndValidate(update_prop.first, ToPropertyValue(std::move(update_prop.second)));
|
||||
if (result_schema.HasError()) {
|
||||
shard_error.emplace(CreateErrorResponse(result_schema.GetError(), req.transaction_id, "updating vertices"));
|
||||
if (const auto result_schema = vertex_to_update->SetPropertyAndValidate(
|
||||
update_prop.first, ToPropertyValue(std::move(update_prop.second)));
|
||||
result_schema.HasError()) {
|
||||
shard_error.emplace(CreateErrorResponse(result_schema.GetError(), req.transaction_id, "adding label"));
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -819,7 +821,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
|
||||
|
||||
std::vector<msgs::ExpandOneResultRow> results;
|
||||
|
||||
auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows);
|
||||
auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniquenessFunction(req.only_unique_neighbor_rows);
|
||||
auto edge_filler = InitializeEdgeFillerFunction(req);
|
||||
|
||||
for (auto &src_vertex : req.src_vertices) {
|
||||
|
@ -21,9 +21,6 @@
|
||||
|
||||
namespace memgraph::storage::v3 {
|
||||
|
||||
template <typename>
|
||||
constexpr auto kAlwaysFalse = false;
|
||||
|
||||
class ShardRsm {
|
||||
std::unique_ptr<Shard> shard_;
|
||||
|
||||
|
@ -1,34 +0,0 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include <boost/asio/thread_pool.hpp>
|
||||
|
||||
#include "storage/v3/shard.hpp"
|
||||
|
||||
namespace memgraph::storage::v3 {
|
||||
|
||||
class Storage {
|
||||
public:
|
||||
explicit Storage(Config config);
|
||||
// Interface toward shard manipulation
|
||||
// Shard handler -> will use rsm client
|
||||
|
||||
private:
|
||||
std::vector<Shard> shards_;
|
||||
boost::asio::thread_pool shard_handlers_;
|
||||
Config config_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::storage::v3
|
@ -9,12 +9,11 @@
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "storage/v3/storage.hpp"
|
||||
#pragma once
|
||||
|
||||
#include "storage/v3/config.hpp"
|
||||
namespace memgraph::utils {
|
||||
|
||||
namespace memgraph::storage::v3 {
|
||||
template <typename>
|
||||
constexpr auto kAlwaysFalse{false};
|
||||
|
||||
Storage::Storage(Config config) : config_{config} {}
|
||||
|
||||
} // namespace memgraph::storage::v3
|
||||
} // namespace memgraph::utils
|
@ -183,19 +183,53 @@ bool AttemptToDeleteVertex(ShardClient &client, int64_t value) {
|
||||
}
|
||||
}
|
||||
|
||||
bool AttemptToUpdateVertex(ShardClient &client, int64_t value) {
|
||||
auto vertex_id = GetValuePrimaryKeysWithValue(value)[0];
|
||||
bool AttemptToUpdateVertex(ShardClient &client, int64_t vertex_primary_key, std::vector<LabelId> add_labels = {},
|
||||
std::vector<LabelId> remove_labels = {}) {
|
||||
auto vertex_id = GetValuePrimaryKeysWithValue(vertex_primary_key)[0];
|
||||
|
||||
std::vector<std::pair<PropertyId, msgs::Value>> property_updates;
|
||||
auto property_update = std::make_pair(PropertyId::FromUint(5), msgs::Value(static_cast<int64_t>(10000)));
|
||||
|
||||
auto vertex_prop = msgs::UpdateVertexProp{};
|
||||
vertex_prop.primary_key = vertex_id;
|
||||
vertex_prop.property_updates = {property_update};
|
||||
msgs::UpdateVertex update_vertex;
|
||||
update_vertex.primary_key = vertex_id;
|
||||
update_vertex.property_updates = {property_update};
|
||||
update_vertex.add_labels = add_labels;
|
||||
update_vertex.remove_labels = remove_labels;
|
||||
|
||||
auto update_req = msgs::UpdateVerticesRequest{};
|
||||
msgs::UpdateVerticesRequest update_req;
|
||||
update_req.transaction_id.logical_id = GetTransactionId();
|
||||
update_req.new_properties = {vertex_prop};
|
||||
update_req.update_vertices = {update_vertex};
|
||||
|
||||
while (true) {
|
||||
auto write_res = client.SendWriteRequest(update_req);
|
||||
if (write_res.HasError()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto write_response_result = write_res.GetValue();
|
||||
auto write_response = std::get<msgs::UpdateVerticesResponse>(write_response_result);
|
||||
|
||||
Commit(client, update_req.transaction_id);
|
||||
return !write_response.error.has_value();
|
||||
}
|
||||
}
|
||||
|
||||
bool AttemptToRemoveVertexProperty(ShardClient &client, int64_t primary_key, std::vector<LabelId> add_labels = {},
|
||||
std::vector<LabelId> remove_labels = {}) {
|
||||
auto vertex_id = GetValuePrimaryKeysWithValue(primary_key)[0];
|
||||
|
||||
std::vector<std::pair<PropertyId, msgs::Value>> property_updates;
|
||||
auto property_update = std::make_pair(PropertyId::FromUint(5), msgs::Value());
|
||||
|
||||
msgs::UpdateVertex update_vertex;
|
||||
update_vertex.primary_key = vertex_id;
|
||||
update_vertex.property_updates = {property_update};
|
||||
update_vertex.add_labels = add_labels;
|
||||
update_vertex.remove_labels = remove_labels;
|
||||
|
||||
msgs::UpdateVerticesRequest update_req;
|
||||
update_req.transaction_id.logical_id = GetTransactionId();
|
||||
update_req.update_vertices = {update_vertex};
|
||||
|
||||
while (true) {
|
||||
auto write_res = client.SendWriteRequest(update_req);
|
||||
@ -872,7 +906,9 @@ void TestCreateAndUpdateVertices(ShardClient &client) {
|
||||
auto unique_prop_val = GetUniqueInteger();
|
||||
|
||||
MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val));
|
||||
MG_ASSERT(AttemptToUpdateVertex(client, unique_prop_val));
|
||||
MG_ASSERT(AttemptToUpdateVertex(client, unique_prop_val, {LabelId::FromInt(3)}, {}));
|
||||
MG_ASSERT(AttemptToUpdateVertex(client, unique_prop_val, {}, {LabelId::FromInt(3)}));
|
||||
MG_ASSERT(AttemptToRemoveVertexProperty(client, unique_prop_val));
|
||||
}
|
||||
|
||||
void TestCreateEdge(ShardClient &client) {
|
||||
|
@ -343,6 +343,9 @@ target_link_libraries(${test_prefix}storage_v3_edge mg-storage-v3)
|
||||
add_unit_test(storage_v3_isolation_level.cpp)
|
||||
target_link_libraries(${test_prefix}storage_v3_isolation_level mg-storage-v3)
|
||||
|
||||
add_unit_test(storage_v3_shard_rsm.cpp)
|
||||
target_link_libraries(${test_prefix}storage_v3_shard_rsm mg-storage-v3)
|
||||
|
||||
add_unit_test(replication_persistence_helper.cpp)
|
||||
target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2)
|
||||
|
||||
|
@ -21,7 +21,7 @@
|
||||
#include "query/v2/context.hpp"
|
||||
#include "query/v2/db_accessor.hpp"
|
||||
#include "query/v2/plan/operator.hpp"
|
||||
#include "storage/v3/storage.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
#include "query_v2_query_common.hpp"
|
||||
|
@ -30,7 +30,7 @@
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "storage/v3/schemas.hpp"
|
||||
#include "storage/v3/storage.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "storage/v3/vertex.hpp"
|
||||
#include "storage/v3/view.hpp"
|
||||
|
||||
|
@ -21,7 +21,7 @@
|
||||
|
||||
#include "query/v2/interpreter.hpp"
|
||||
#include "result_stream_faker.hpp"
|
||||
#include "storage/v3/storage.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
|
||||
DECLARE_bool(query_cost_planner);
|
||||
|
||||
|
@ -15,7 +15,7 @@
|
||||
#include "query/v2/plan/operator.hpp"
|
||||
#include "query_v2_query_plan_common.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "storage/v3/storage.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
|
||||
namespace memgraph::query::v2::tests {
|
||||
|
||||
|
@ -16,7 +16,7 @@
|
||||
#include "glue/v2/communication.hpp"
|
||||
#include "query/v2/bindings/typed_value.hpp"
|
||||
|
||||
#include "storage/v3/storage.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "utils/algorithm.hpp"
|
||||
|
||||
/**
|
||||
|
@ -18,7 +18,7 @@
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "storage/v3/name_id_mapper.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "storage/v3/storage.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "storage/v3/temporal.hpp"
|
||||
#include "storage/v3/view.hpp"
|
||||
|
||||
|
@ -13,7 +13,7 @@
|
||||
|
||||
#include "storage/v3/isolation_level.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "storage/v3/storage.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
|
||||
namespace memgraph::storage::v3::tests {
|
||||
int64_t VerticesCount(Shard::Accessor &accessor) {
|
||||
|
@ -23,7 +23,7 @@
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "storage/v3/schema_validator.hpp"
|
||||
#include "storage/v3/schemas.hpp"
|
||||
#include "storage/v3/storage.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "storage/v3/temporal.hpp"
|
||||
|
||||
using testing::Pair;
|
||||
|
316
tests/unit/storage_v3_shard_rsm.cpp
Normal file
316
tests/unit/storage_v3_shard_rsm.cpp
Normal file
@ -0,0 +1,316 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <gmock/gmock-matchers.h>
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
#include "common/types.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "storage/v3/schema_validator.hpp"
|
||||
#include "storage/v3/schemas.hpp"
|
||||
#include "storage/v3/shard_rsm.hpp"
|
||||
#include "storage/v3/temporal.hpp"
|
||||
#include "storage/v3/vertex_id.hpp"
|
||||
|
||||
using testing::Pair;
|
||||
using testing::UnorderedElementsAre;
|
||||
using SchemaType = memgraph::common::SchemaType;
|
||||
|
||||
namespace memgraph::storage::v3::tests {
|
||||
|
||||
uint64_t GetTransactionId() {
|
||||
static uint64_t transaction_id = 0;
|
||||
return transaction_id++;
|
||||
}
|
||||
|
||||
class ShardRSMTest : public testing::Test {
|
||||
private:
|
||||
NameIdMapper id_mapper_{{{1, "primary_label"},
|
||||
{2, "primary_label2"},
|
||||
{3, "label"},
|
||||
{4, "primary_prop1"},
|
||||
{5, "primary_prop2"},
|
||||
{6, "prop"}}};
|
||||
|
||||
protected:
|
||||
ShardRSMTest() {
|
||||
PropertyValue min_pk(static_cast<int64_t>(0));
|
||||
std::vector<PropertyValue> min_prim_key = {min_pk};
|
||||
PropertyValue max_pk(static_cast<int64_t>(10000000));
|
||||
std::vector<PropertyValue> max_prim_key = {max_pk};
|
||||
|
||||
auto shard_ptr1 = std::make_unique<Shard>(primary_label, min_prim_key, max_prim_key, std::vector{schema_prop});
|
||||
shard_ptr1->StoreMapping({{1, "primary_label"},
|
||||
{2, "primary_label2"},
|
||||
{3, "label"},
|
||||
{4, "primary_prop1"},
|
||||
{5, "primary_prop2"},
|
||||
{6, "prop"}});
|
||||
shard_ptr1->CreateSchema(primary_label2, {{primary_property2, SchemaType::INT}});
|
||||
shard_rsm = std::make_unique<ShardRsm>(std::move(shard_ptr1));
|
||||
}
|
||||
|
||||
LabelId NameToLabel(const std::string &name) { return LabelId::FromUint(id_mapper_.NameToId(name)); }
|
||||
|
||||
PropertyId NameToProperty(const std::string &name) { return PropertyId::FromUint(id_mapper_.NameToId(name)); }
|
||||
|
||||
auto Commit(const auto &req) {
|
||||
const coordinator::Hlc commit_timestamp{GetTransactionId()};
|
||||
msgs::CommitRequest commit_req;
|
||||
commit_req.transaction_id = req.transaction_id;
|
||||
commit_req.commit_timestamp = commit_timestamp;
|
||||
return shard_rsm->Apply(commit_req);
|
||||
}
|
||||
|
||||
void CreateVertex(const msgs::PrimaryKey &primary_key, const std::vector<msgs::Label> labels,
|
||||
const std::vector<std::pair<msgs::PropertyId, msgs::Value>> &properties) {
|
||||
msgs::NewVertex vertex = {labels, primary_key, properties};
|
||||
msgs::CreateVerticesRequest create_req;
|
||||
create_req.new_vertices = {vertex};
|
||||
create_req.new_vertices = {vertex};
|
||||
create_req.transaction_id.logical_id = GetTransactionId();
|
||||
|
||||
auto write_res = shard_rsm->Apply(create_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::CreateVerticesResponse>(write_res));
|
||||
|
||||
auto commit_res = Commit(create_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::CommitResponse>(commit_res));
|
||||
ASSERT_FALSE(std::get<msgs::CommitResponse>(commit_res).error.has_value());
|
||||
}
|
||||
|
||||
void AssertVertexExists(const msgs::PrimaryKey &primary_key, const std::vector<msgs::Label> &labels,
|
||||
const std::vector<std::pair<msgs::PropertyId, msgs::Value>> &properties) {
|
||||
msgs::ScanVerticesRequest scan_req;
|
||||
scan_req.props_to_return = std::nullopt;
|
||||
scan_req.start_id = msgs::VertexId{msgs::Label{.id = primary_label}, primary_key};
|
||||
scan_req.storage_view = msgs::StorageView::OLD;
|
||||
scan_req.transaction_id.logical_id = GetTransactionId();
|
||||
|
||||
// Make request
|
||||
auto maybe_read_res = shard_rsm->Read(scan_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::ScanVerticesResponse>(maybe_read_res));
|
||||
const auto read_res = std::get<msgs::ScanVerticesResponse>(maybe_read_res);
|
||||
EXPECT_FALSE(read_res.error.has_value());
|
||||
EXPECT_EQ(read_res.results.size(), 1);
|
||||
|
||||
// Read results
|
||||
const auto res = read_res.results[0];
|
||||
const auto vtx_id = msgs::VertexId{msgs::Label{.id = primary_label}, primary_key};
|
||||
EXPECT_EQ(res.vertex.id, vtx_id);
|
||||
EXPECT_EQ(res.vertex.labels, labels);
|
||||
EXPECT_EQ(res.props, properties);
|
||||
}
|
||||
|
||||
LabelId primary_label{NameToLabel("primary_label")};
|
||||
LabelId primary_label2{NameToLabel("primary_label2")};
|
||||
LabelId label{NameToLabel("label")};
|
||||
PropertyId primary_property1{NameToProperty("primary_prop1")};
|
||||
PropertyId primary_property2{NameToProperty("primary_prop2")};
|
||||
PropertyId prop{NameToProperty("prop")};
|
||||
SchemaProperty schema_prop{primary_property1, SchemaType::INT};
|
||||
std::unique_ptr<ShardRsm> shard_rsm;
|
||||
};
|
||||
|
||||
TEST_F(ShardRSMTest, TestUpdateVertexSecondaryProperty) {
|
||||
const msgs::Value primary_key_val{static_cast<int64_t>(1)};
|
||||
const msgs::PrimaryKey pk{primary_key_val};
|
||||
|
||||
// Create Vertex
|
||||
CreateVertex(pk, {}, {});
|
||||
|
||||
// Add property prop
|
||||
static constexpr int64_t updated_vertex_id{10};
|
||||
{
|
||||
msgs::UpdateVerticesRequest update_req;
|
||||
update_req.transaction_id.logical_id = GetTransactionId();
|
||||
|
||||
update_req.update_vertices =
|
||||
std::vector<msgs::UpdateVertex>{{pk, {}, {}, {{msgs::PropertyId(prop), msgs::Value(updated_vertex_id)}}}};
|
||||
|
||||
const auto write_res = shard_rsm->Apply(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::UpdateVerticesResponse>(write_res));
|
||||
EXPECT_FALSE(std::get<msgs::UpdateVerticesResponse>(write_res).error.has_value());
|
||||
|
||||
const auto commit_res = Commit(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::CommitResponse>(commit_res));
|
||||
EXPECT_FALSE(std::get<msgs::CommitResponse>(commit_res).error.has_value());
|
||||
}
|
||||
AssertVertexExists(pk, {}, {{primary_property1, primary_key_val}, {prop, msgs::Value(updated_vertex_id)}});
|
||||
|
||||
// Update property prop
|
||||
static constexpr int64_t updated_vertex_id_2{101};
|
||||
{
|
||||
msgs::UpdateVerticesRequest update_req;
|
||||
update_req.transaction_id.logical_id = GetTransactionId();
|
||||
|
||||
update_req.update_vertices =
|
||||
std::vector<msgs::UpdateVertex>{{pk, {}, {}, {{msgs::PropertyId(prop), msgs::Value(updated_vertex_id_2)}}}};
|
||||
|
||||
const auto write_res = shard_rsm->Apply(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::UpdateVerticesResponse>(write_res));
|
||||
EXPECT_FALSE(std::get<msgs::UpdateVerticesResponse>(write_res).error.has_value());
|
||||
|
||||
const auto commit_res = Commit(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::CommitResponse>(commit_res));
|
||||
EXPECT_FALSE(std::get<msgs::CommitResponse>(commit_res).error.has_value());
|
||||
AssertVertexExists(pk, {}, {{primary_property1, primary_key_val}, {prop, msgs::Value(updated_vertex_id_2)}});
|
||||
}
|
||||
AssertVertexExists(pk, {}, {{primary_property1, primary_key_val}, {prop, msgs::Value(updated_vertex_id_2)}});
|
||||
|
||||
// Remove property prop
|
||||
{
|
||||
msgs::UpdateVerticesRequest update_req;
|
||||
update_req.transaction_id.logical_id = GetTransactionId();
|
||||
|
||||
update_req.update_vertices =
|
||||
std::vector<msgs::UpdateVertex>{{pk, {}, {}, {{msgs::PropertyId(prop), msgs::Value()}}}};
|
||||
|
||||
const auto write_res = shard_rsm->Apply(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::UpdateVerticesResponse>(write_res));
|
||||
EXPECT_FALSE(std::get<msgs::UpdateVerticesResponse>(write_res).error.has_value());
|
||||
|
||||
const auto commit_res = Commit(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::CommitResponse>(commit_res));
|
||||
EXPECT_FALSE(std::get<msgs::CommitResponse>(commit_res).error.has_value());
|
||||
}
|
||||
AssertVertexExists(pk, {}, {{primary_property1, primary_key_val}});
|
||||
}
|
||||
|
||||
TEST_F(ShardRSMTest, TestUpdateVertexPrimaryProperty) {
|
||||
const msgs::Value primary_key_val{static_cast<int64_t>(1)};
|
||||
const msgs::PrimaryKey pk{primary_key_val};
|
||||
|
||||
// Create Vertex
|
||||
CreateVertex(pk, {}, {});
|
||||
|
||||
// Try to update primary property
|
||||
static constexpr int64_t updated_vertex_id{10};
|
||||
{
|
||||
msgs::UpdateVerticesRequest update_req;
|
||||
update_req.transaction_id.logical_id = GetTransactionId();
|
||||
|
||||
update_req.update_vertices = std::vector<msgs::UpdateVertex>{
|
||||
{pk, {}, {}, {{msgs::PropertyId(primary_property1), msgs::Value(updated_vertex_id)}}}};
|
||||
|
||||
const auto write_res = shard_rsm->Apply(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::UpdateVerticesResponse>(write_res));
|
||||
EXPECT_TRUE(std::get<msgs::UpdateVerticesResponse>(write_res).error.has_value());
|
||||
}
|
||||
AssertVertexExists(pk, {}, {{primary_property1, primary_key_val}});
|
||||
// Try to update primary property of another schema
|
||||
{
|
||||
msgs::UpdateVerticesRequest update_req;
|
||||
update_req.transaction_id.logical_id = GetTransactionId();
|
||||
|
||||
update_req.update_vertices = std::vector<msgs::UpdateVertex>{
|
||||
{pk, {}, {}, {{msgs::PropertyId(primary_property2), msgs::Value(updated_vertex_id)}}}};
|
||||
|
||||
const auto write_res = shard_rsm->Apply(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::UpdateVerticesResponse>(write_res));
|
||||
EXPECT_FALSE(std::get<msgs::UpdateVerticesResponse>(write_res).error.has_value());
|
||||
|
||||
const auto commit_res = Commit(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::CommitResponse>(commit_res));
|
||||
EXPECT_FALSE(std::get<msgs::CommitResponse>(commit_res).error.has_value());
|
||||
}
|
||||
AssertVertexExists(pk, {},
|
||||
{{primary_property1, primary_key_val}, {primary_property2, msgs::Value(updated_vertex_id)}});
|
||||
}
|
||||
|
||||
TEST_F(ShardRSMTest, TestUpdateSecondaryLabel) {
|
||||
const msgs::Value primary_key_val{static_cast<int64_t>(1)};
|
||||
const msgs::PrimaryKey pk{primary_key_val};
|
||||
|
||||
// Create Vertex
|
||||
CreateVertex(pk, {}, {});
|
||||
|
||||
// Add label label
|
||||
const msgs::Label secondary_label{label};
|
||||
{
|
||||
msgs::UpdateVerticesRequest update_req;
|
||||
update_req.transaction_id.logical_id = GetTransactionId();
|
||||
|
||||
update_req.update_vertices = std::vector<msgs::UpdateVertex>{{pk, {label}, {}, {}}};
|
||||
|
||||
const auto write_res = shard_rsm->Apply(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::UpdateVerticesResponse>(write_res));
|
||||
EXPECT_FALSE(std::get<msgs::UpdateVerticesResponse>(write_res).error.has_value());
|
||||
|
||||
const auto commit_res = Commit(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::CommitResponse>(commit_res));
|
||||
EXPECT_FALSE(std::get<msgs::CommitResponse>(commit_res).error.has_value());
|
||||
}
|
||||
AssertVertexExists(pk, {secondary_label}, {{primary_property1, primary_key_val}});
|
||||
|
||||
// Remove primary label
|
||||
{
|
||||
msgs::UpdateVerticesRequest update_req;
|
||||
update_req.transaction_id.logical_id = GetTransactionId();
|
||||
|
||||
update_req.update_vertices = std::vector<msgs::UpdateVertex>{{pk, {}, {label}, {}}};
|
||||
|
||||
const auto write_res = shard_rsm->Apply(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::UpdateVerticesResponse>(write_res));
|
||||
EXPECT_FALSE(std::get<msgs::UpdateVerticesResponse>(write_res).error.has_value());
|
||||
|
||||
const auto commit_res = Commit(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::CommitResponse>(commit_res));
|
||||
EXPECT_FALSE(std::get<msgs::CommitResponse>(commit_res).error.has_value());
|
||||
}
|
||||
AssertVertexExists(pk, {}, {{primary_property1, primary_key_val}});
|
||||
}
|
||||
|
||||
TEST_F(ShardRSMTest, TestUpdatePrimaryLabel) {
|
||||
const msgs::Value primary_key_val{static_cast<int64_t>(1)};
|
||||
const msgs::PrimaryKey pk{primary_key_val};
|
||||
|
||||
// Create Vertex
|
||||
CreateVertex(pk, {}, {});
|
||||
|
||||
// Remove primary label
|
||||
{
|
||||
msgs::UpdateVerticesRequest update_req;
|
||||
update_req.transaction_id.logical_id = GetTransactionId();
|
||||
|
||||
update_req.update_vertices = std::vector<msgs::UpdateVertex>{{pk, {}, {primary_label}, {}}};
|
||||
|
||||
const auto write_res = shard_rsm->Apply(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::UpdateVerticesResponse>(write_res));
|
||||
EXPECT_TRUE(std::get<msgs::UpdateVerticesResponse>(write_res).error.has_value());
|
||||
}
|
||||
AssertVertexExists(pk, {}, {{primary_property1, primary_key_val}});
|
||||
|
||||
// Add different primary label
|
||||
{
|
||||
msgs::UpdateVerticesRequest update_req;
|
||||
update_req.transaction_id.logical_id = GetTransactionId();
|
||||
|
||||
update_req.update_vertices = std::vector<msgs::UpdateVertex>{{pk, {primary_label2}, {}, {}}};
|
||||
|
||||
const auto write_res = shard_rsm->Apply(update_req);
|
||||
ASSERT_TRUE(std::holds_alternative<msgs::UpdateVerticesResponse>(write_res));
|
||||
EXPECT_TRUE(std::get<msgs::UpdateVerticesResponse>(write_res).error.has_value());
|
||||
}
|
||||
AssertVertexExists(pk, {}, {{primary_property1, primary_key_val}});
|
||||
}
|
||||
|
||||
} // namespace memgraph::storage::v3::tests
|
@ -11,7 +11,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "storage/v3/storage.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "storage/v3/view.hpp"
|
||||
|
||||
namespace memgraph::storage::v3::tests {
|
||||
|
Loading…
Reference in New Issue
Block a user