Create ExpandOne request (#580)

Refactor CreateEdge into CreateExpand
This commit is contained in:
Jure Bajic 2022-10-19 13:55:46 +02:00 committed by GitHub
parent 07f34838bd
commit 6bb40a7f49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 401 additions and 93 deletions

View File

@ -54,6 +54,9 @@ struct AddressAndStatus {
memgraph::io::Address address;
Status status;
friend bool operator<(const AddressAndStatus &lhs, const AddressAndStatus &rhs) { return lhs.address < rhs.address; }
friend bool operator==(const AddressAndStatus &lhs, const AddressAndStatus &rhs) {
return lhs.address == rhs.address;
}
};
using PrimaryKey = std::vector<PropertyValue>;
@ -115,7 +118,12 @@ struct ShardMap {
bool mutated = false;
for (auto &[label_id, label_space] : label_spaces) {
for (auto &[low_key, shard] : label_space.shards) {
for (auto it = label_space.shards.begin(); it != label_space.shards.end(); it++) {
auto &[low_key, shard] = *it;
std::optional<PrimaryKey> high_key;
if (const auto next_it = std::next(it); next_it != label_space.shards.end()) {
high_key = next_it->first;
}
// TODO(tyler) avoid these triple-nested loops by having the heartbeat include better info
bool machine_contains_shard = false;
@ -133,7 +141,7 @@ struct ShardMap {
.uuid = aas.address.unique_id,
.label_id = label_id,
.min_key = low_key,
.max_key = std::nullopt,
.max_key = high_key,
.schema = schemas[label_id],
.config = Config{},
});
@ -150,7 +158,7 @@ struct ShardMap {
ret.push_back(ShardToInitialize{.uuid = address.unique_id,
.label_id = label_id,
.min_key = low_key,
.max_key = std::nullopt,
.max_key = high_key,
.schema = schemas[label_id],
.config = Config{}});

View File

@ -204,9 +204,45 @@ Value ToBoltValue(msgs::Value value) {
case msgs::Value::Type::Vertex:
case msgs::Value::Type::Edge:
case msgs::Value::Type::Path: {
throw utils::BasicException("Path, Vertex and Edge not supported!");
throw utils::BasicException("Vertex, Edge and Path are not supported!");
}
}
}
Value ToBoltValue(msgs::Value value, const coordinator::ShardMap & /*shard_map*/, storage::v3::View /*view*/) {
switch (value.type) {
case msgs::Value::Type::Null:
return {};
case msgs::Value::Type::Bool:
return {value.bool_v};
case msgs::Value::Type::Int64:
return {value.int_v};
case msgs::Value::Type::Double:
return {value.double_v};
case msgs::Value::Type::String:
return {std::string(value.string_v)};
case msgs::Value::Type::List: {
std::vector<Value> values;
values.reserve(value.list_v.size());
for (const auto &v : value.list_v) {
auto maybe_value = ToBoltValue(v);
values.emplace_back(std::move(maybe_value));
}
return Value{std::move(values)};
}
case msgs::Value::Type::Map: {
std::map<std::string, Value> map;
for (const auto &kv : value.map_v) {
auto maybe_value = ToBoltValue(kv.second);
map.emplace(kv.first, std::move(maybe_value));
}
return Value{std::move(map)};
}
case msgs::Value::Type::Vertex:
case msgs::Value::Type::Edge:
case msgs::Value::Type::Path: {
throw utils::BasicException("Vertex, Edge and Path are not supported!");
}
// TODO Value to Date types not supported
}
}

View File

@ -639,12 +639,15 @@ int main(int argc, char **argv) {
.listen_port = unique_local_addr_query.last_known_port,
};
const std::string property{"property"};
const std::string label{"label"};
memgraph::coordinator::ShardMap sm;
auto prop_map = sm.AllocatePropertyIds(std::vector<std::string>{"property"});
auto prop_map = sm.AllocatePropertyIds(std::vector<std::string>{property});
auto edge_type_map = sm.AllocateEdgeTypeIds(std::vector<std::string>{"edge_type"});
std::vector<memgraph::storage::v3::SchemaProperty> schema{
{prop_map.at("property"), memgraph::common::SchemaType::INT}};
sm.InitializeNewLabel("label", schema, 1, sm.shard_map_version);
std::vector<memgraph::storage::v3::SchemaProperty> schema{{prop_map.at(property), memgraph::common::SchemaType::INT}};
sm.InitializeNewLabel(label, schema, 1, sm.shard_map_version);
sm.SplitShard(sm.GetHlc(), sm.GetLabelId(label),
std::vector<memgraph::storage::v3::PropertyValue>{memgraph::storage::v3::PropertyValue{2}});
memgraph::coordinator::Coordinator coordinator{sm};

View File

@ -43,6 +43,8 @@ VertexAccessor EdgeAccessor::From() const { return VertexAccessor(Vertex{edge.sr
VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props)
: vertex(std::move(v)), properties(std::move(props)) {}
Label VertexAccessor::PrimaryLabel() const { return vertex.id.first; }
std::vector<Label> VertexAccessor::Labels() const { return vertex.labels; }
bool VertexAccessor::HasLabel(Label &label) const {

View File

@ -73,6 +73,8 @@ class VertexAccessor final {
using Label = msgs::Label;
VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props);
Label PrimaryLabel() const;
std::vector<Label> Labels() const;
bool HasLabel(Label &label) const;

View File

@ -25,6 +25,23 @@
namespace memgraph::query::v2 {
// Used to store range of ids that are available
// Used for edge id assignment
class IdAllocator {
public:
IdAllocator() = default;
IdAllocator(uint64_t low, uint64_t high) : current_edge_id_{low}, max_id_{high} {};
uint64_t AllocateId() {
MG_ASSERT(current_edge_id_ < max_id_, "Current Edge Id went above max id");
return current_edge_id_++;
}
private:
uint64_t current_edge_id_;
uint64_t max_id_;
};
struct EvaluationContext {
/// Memory for allocations during evaluation of a *single* Pull call.
///
@ -79,9 +96,9 @@ struct ExecutionContext {
plan::ProfilingStats stats;
plan::ProfilingStats *stats_root{nullptr};
ExecutionStats execution_stats;
// TriggerContextCollector *trigger_context_collector{nullptr};
utils::AsyncTimer timer;
msgs::ShardRequestManagerInterface *shard_request_manager{nullptr};
IdAllocator edge_ids_alloc;
};
static_assert(std::is_move_assignable_v<ExecutionContext>, "ExecutionContext must be move assignable!");

View File

@ -680,7 +680,6 @@ struct PullPlan {
PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &parameters, const bool is_profile_query,
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
msgs::ShardRequestManagerInterface *shard_request_manager, const std::optional<size_t> memory_limit)
// TriggerContextCollector *trigger_context_collector, const std::optional<size_t> memory_limit)
: plan_(plan),
cursor_(plan->plan().MakeCursor(execution_memory)),
frame_(plan->symbol_table().max_position(), execution_memory),
@ -696,7 +695,6 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
}
ctx_.is_shutting_down = &interpreter_context->is_shutting_down;
ctx_.is_profile_query = is_profile_query;
// ctx_.trigger_context_collector = trigger_context_collector;
ctx_.shard_request_manager = shard_request_manager;
}
@ -805,6 +803,20 @@ Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_
coordinator::CoordinatorClient<io::local_transport::LocalTransport>(
query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}),
std::move(query_io));
// Get edge ids
coordinator::CoordinatorWriteRequests requests{coordinator::AllocateEdgeIdBatchRequest{.batch_size = 1000000}};
io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests> ww;
ww.operation = requests;
auto resp = interpreter_context_->io
.Request<io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests>,
io::rsm::WriteResponse<coordinator::CoordinatorWriteResponses>>(
interpreter_context_->coordinator_address, ww)
.Wait();
if (resp.HasValue()) {
const auto alloc_edge_id_reps =
std::get<coordinator::AllocateEdgeIdBatchResponse>(resp.GetValue().message.write_return);
interpreter_context_->edge_ids_alloc = {alloc_edge_id_reps.low, alloc_edge_id_reps.high};
}
}
PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper) {

View File

@ -12,6 +12,7 @@
#pragma once
#include <gflags/gflags.h>
#include <cstdint>
#include "coordinator/coordinator.hpp"
#include "coordinator/coordinator_client.hpp"
@ -30,6 +31,7 @@
#include "query/v2/metadata.hpp"
#include "query/v2/plan/operator.hpp"
#include "query/v2/plan/read_write_type_checker.hpp"
#include "query/v2/requests.hpp"
#include "query/v2/stream.hpp"
#include "storage/v3/isolation_level.hpp"
#include "storage/v3/name_id_mapper.hpp"
@ -184,6 +186,7 @@ struct InterpreterContext {
utils::SkipList<PlanCacheEntry> plan_cache;
const InterpreterConfig config;
IdAllocator edge_ids_alloc;
// TODO (antaljanosbenjamin) Figure out an abstraction for io::Io to make it possible to construct an interpreter
// context with a simulator transport without templatizing it.
@ -334,7 +337,7 @@ class Interpreter final {
// This cannot be std::optional because we need to move this accessor later on into a lambda capture
// which is assigned to std::function. std::function requires every object to be copyable, so we
// move this unique_ptr into a shrared_ptr.
// move this unique_ptr into a shared_ptr.
std::unique_ptr<storage::v3::Shard::Accessor> db_accessor_;
std::optional<DbAccessor> execution_db_accessor_;
std::unique_ptr<msgs::ShardRequestManagerInterface> shard_request_manager_;

View File

@ -183,15 +183,22 @@ class DistributedCreateNodeCursor : public Cursor {
std::vector<msgs::NewVertex> requests;
for (const auto &node_info : nodes_info_) {
msgs::NewVertex rqst;
MG_ASSERT(!node_info->labels.empty(), "Cannot determine primary label");
const auto primary_label = node_info->labels[0];
// TODO(jbajic) Fix properties not send,
// suggestion: ignore distinction between properties and primary keys
// since schema validation is done on storage side
std::map<msgs::PropertyId, msgs::Value> properties;
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr,
storage::v3::View::NEW);
if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info->properties)) {
for (const auto &[key, value_expression] : *node_info_properties) {
TypedValue val = value_expression->Accept(evaluator);
properties[key] = TypedValueToValue(val);
if (context.shard_request_manager->IsPrimaryKey(key)) {
rqst.primary_key.push_back(storage::v3::TypedValueToValue(val));
if (context.shard_request_manager->IsPrimaryKey(primary_label, key)) {
rqst.primary_key.push_back(TypedValueToValue(val));
} else {
properties[key] = TypedValueToValue(val);
}
}
} else {
@ -199,9 +206,10 @@ class DistributedCreateNodeCursor : public Cursor {
for (const auto &[key, value] : property_map) {
auto key_str = std::string(key);
auto property_id = context.shard_request_manager->NameToProperty(key_str);
properties[property_id] = TypedValueToValue(value);
if (context.shard_request_manager->IsPrimaryKey(property_id)) {
if (context.shard_request_manager->IsPrimaryKey(primary_label, property_id)) {
rqst.primary_key.push_back(storage::v3::TypedValueToValue(value));
} else {
properties[property_id] = TypedValueToValue(value);
}
}
}
@ -210,7 +218,7 @@ class DistributedCreateNodeCursor : public Cursor {
throw QueryRuntimeException("Primary label must be defined!");
}
// TODO(kostasrim) Copy non primary labels as well
rqst.label_ids.push_back(msgs::Label{node_info->labels[0]});
rqst.label_ids.push_back(msgs::Label{.id = primary_label});
requests.push_back(std::move(rqst));
}
return requests;
@ -280,10 +288,12 @@ CreateExpand::CreateExpand(const NodeCreationInfo &node_info, const EdgeCreation
ACCEPT_WITH_INPUT(CreateExpand)
class DistributedCreateExpandCursor;
UniqueCursorPtr CreateExpand::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::CreateNodeOperator);
return MakeUniqueCursorPtr<CreateExpandCursor>(mem, *this, mem);
return MakeUniqueCursorPtr<DistributedCreateExpandCursor>(mem, input_, mem, *this);
}
std::vector<Symbol> CreateExpand::ModifiedSymbols(const SymbolTable &table) const {
@ -2338,4 +2348,103 @@ bool Foreach::Accept(HierarchicalLogicalOperatorVisitor &visitor) {
return visitor.PostVisit(*this);
}
class DistributedCreateExpandCursor : public Cursor {
public:
using InputOperator = std::shared_ptr<memgraph::query::v2::plan::LogicalOperator>;
DistributedCreateExpandCursor(const InputOperator &op, utils::MemoryResource *mem, const CreateExpand &self)
: input_cursor_{op->MakeCursor(mem)}, self_{self} {}
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("CreateExpand");
if (!input_cursor_->Pull(frame, context)) {
return false;
}
auto &shard_manager = context.shard_request_manager;
ResetExecutionState();
shard_manager->Request(state_, ExpandCreationInfoToRequest(context, frame));
return true;
}
void Shutdown() override { input_cursor_->Shutdown(); }
void Reset() override {
input_cursor_->Reset();
ResetExecutionState();
}
// Get the existing node other vertex
accessors::VertexAccessor &OtherVertex(Frame &frame) const {
// This assumes that vertex exists
MG_ASSERT(self_.existing_node_, "Vertex creating with edge not supported!");
TypedValue &dest_node_value = frame[self_.node_info_.symbol];
ExpectType(self_.node_info_.symbol, dest_node_value, TypedValue::Type::Vertex);
return dest_node_value.ValueVertex();
}
std::vector<msgs::NewExpand> ExpandCreationInfoToRequest(ExecutionContext &context, Frame &frame) const {
std::vector<msgs::NewExpand> edge_requests;
for (const auto &edge_info : std::vector{self_.edge_info_}) {
msgs::NewExpand request{.id = {context.edge_ids_alloc.AllocateId()}};
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr,
storage::v3::View::NEW);
request.type = {edge_info.edge_type.AsUint()};
if (const auto *edge_info_properties = std::get_if<PropertiesMapList>(&edge_info.properties)) {
for (const auto &[property, value_expression] : *edge_info_properties) {
TypedValue val = value_expression->Accept(evaluator);
request.properties.emplace_back(property, storage::v3::TypedValueToValue(val));
}
} else {
// handle parameter
auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(edge_info.properties)).ValueMap();
for (const auto &[property, value] : property_map) {
const auto property_id = context.shard_request_manager->NameToProperty(std::string(property));
request.properties.emplace_back(property_id, storage::v3::TypedValueToValue(value));
}
}
// src, dest
TypedValue &v1_value = frame[self_.input_symbol_];
const auto &v1 = v1_value.ValueVertex();
const auto &v2 = OtherVertex(frame);
// Set src and dest vertices
// TODO(jbajic) Currently we are only handling scenario where vertices
// are matched
const auto set_vertex = [&context](const auto &vertex, auto &vertex_id) {
vertex_id.first = vertex.PrimaryLabel();
for (const auto &[key, val] : vertex.Properties()) {
if (context.shard_request_manager->IsPrimaryKey(vertex_id.first.id, key)) {
vertex_id.second.push_back(val);
}
}
};
std::invoke([&]() {
switch (edge_info.direction) {
case EdgeAtom::Direction::IN: {
set_vertex(v1, request.src_vertex);
set_vertex(v2, request.dest_vertex);
break;
}
case EdgeAtom::Direction::OUT: {
set_vertex(v1, request.dest_vertex);
set_vertex(v2, request.src_vertex);
break;
}
case EdgeAtom::Direction::BOTH:
LOG_FATAL("Must indicate exact expansion direction here");
}
});
edge_requests.push_back(std::move(request));
}
return edge_requests;
}
private:
void ResetExecutionState() { state_ = {}; }
const UniqueCursorPtr input_cursor_;
const CreateExpand &self_;
msgs::ExecutionState<msgs::CreateExpandRequest> state_;
};
} // namespace memgraph::query::v2::plan

View File

@ -33,10 +33,7 @@ class VertexCountCache {
auto NameToLabel(const std::string &name) { return shard_request_manager_->LabelNameToLabelId(name); }
auto NameToProperty(const std::string &name) { return shard_request_manager_->NameToProperty(name); }
auto NameToEdgeType(const std::string & /*name*/) {
MG_ASSERT(false, "NameToEdgeType");
return storage::v3::EdgeTypeId::FromInt(0);
}
auto NameToEdgeType(const std::string &name) { return shard_request_manager_->NameToEdgeType(name); }
int64_t VerticesCount() { return 1; }

View File

@ -56,6 +56,9 @@ struct EdgeType {
struct EdgeId {
Gid gid;
friend bool operator==(const EdgeId &lhs, const EdgeId &rhs) { return lhs.gid == rhs.gid; }
friend bool operator<(const EdgeId &lhs, const EdgeId &rhs) { return lhs.gid < rhs.gid; }
};
struct Edge {
@ -64,9 +67,7 @@ struct Edge {
std::optional<std::vector<std::pair<PropertyId, Value>>> properties;
EdgeId id;
EdgeType type;
friend bool operator==(const Edge &lhs, const Edge &rhs) {
return (lhs.src == rhs.src) && (lhs.dst == rhs.dst) && (lhs.type == rhs.type);
}
friend bool operator==(const Edge &lhs, const Edge &rhs) { return lhs.id == rhs.id; }
};
struct Vertex {
@ -536,12 +537,22 @@ struct UpdateVerticesResponse {
/*
* Edges
*/
struct CreateEdgesRequest {
Hlc transaction_id;
std::vector<Edge> edges;
// No need for specifying direction since it has to be in one, and src and dest
// vertices clearly communicate the direction
struct NewExpand {
EdgeId id;
EdgeType type;
VertexId src_vertex;
VertexId dest_vertex;
std::vector<std::pair<PropertyId, Value>> properties;
};
struct CreateEdgesResponse {
struct CreateExpandRequest {
Hlc transaction_id;
std::vector<NewExpand> new_expands;
};
struct CreateExpandResponse {
bool success;
};
@ -576,8 +587,8 @@ using ReadRequests = std::variant<ExpandOneRequest, GetPropertiesRequest, ScanVe
using ReadResponses = std::variant<ExpandOneResponse, GetPropertiesResponse, ScanVerticesResponse>;
using WriteRequests = std::variant<CreateVerticesRequest, DeleteVerticesRequest, UpdateVerticesRequest,
CreateEdgesRequest, DeleteEdgesRequest, UpdateEdgesRequest, CommitRequest>;
CreateExpandRequest, DeleteEdgesRequest, UpdateEdgesRequest, CommitRequest>;
using WriteResponses = std::variant<CreateVerticesResponse, DeleteVerticesResponse, UpdateVerticesResponse,
CreateEdgesResponse, DeleteEdgesResponse, UpdateEdgesResponse, CommitResponse>;
CreateExpandResponse, DeleteEdgesResponse, UpdateEdgesResponse, CommitResponse>;
} // namespace memgraph::msgs

View File

@ -54,18 +54,20 @@ class RsmStorageClientManager {
RsmStorageClientManager &operator=(RsmStorageClientManager &&) = delete;
~RsmStorageClientManager() = default;
void AddClient(const LabelId label_id, Shard key, TStorageClient client) {
cli_cache_[label_id].insert({std::move(key), std::move(client)});
}
void AddClient(Shard key, TStorageClient client) { cli_cache_.emplace(std::move(key), std::move(client)); }
bool Exists(const LabelId label_id, const Shard &key) { return cli_cache_[label_id].contains(key); }
bool Exists(const Shard &key) { return cli_cache_.contains(key); }
void PurgeCache() { cli_cache_.clear(); }
TStorageClient &GetClient(const LabelId label_id, const Shard &key) { return cli_cache_[label_id].find(key)->second; }
TStorageClient &GetClient(const Shard &key) {
auto it = cli_cache_.find(key);
MG_ASSERT(it != cli_cache_.end(), "Non-existing shard client");
return it->second;
}
private:
std::map<LabelId, std::map<Shard, TStorageClient>> cli_cache_;
std::map<Shard, TStorageClient> cli_cache_;
};
template <typename TRequest>
@ -93,7 +95,7 @@ struct ExecutionState {
// a partial response on a shard(if there is one) is finished and we can send off the request for the next batch.
std::vector<Shard> shard_cache;
// 1-1 mapping with `shard_cache`.
// A vector that tracks request metatdata for each shard (For example, next_id for a ScanAll on Shard A)
// A vector that tracks request metadata for each shard (For example, next_id for a ScanAll on Shard A)
std::vector<TRequest> requests;
State state = INITIALIZING;
};
@ -115,6 +117,8 @@ class ShardRequestManagerInterface {
virtual std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state,
std::vector<NewVertex> new_vertices) = 0;
virtual std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) = 0;
virtual std::vector<CreateExpandResponse> Request(ExecutionState<CreateExpandRequest> &state,
std::vector<NewExpand> new_edges) = 0;
// TODO(antaljanosbenjamin): unify the GetXXXId and NameToId functions to have consistent naming, return type and
// implementation
virtual storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const = 0;
@ -123,7 +127,7 @@ class ShardRequestManagerInterface {
virtual const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const = 0;
virtual const std::string &LabelToName(memgraph::storage::v3::LabelId label) const = 0;
virtual const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const = 0;
virtual bool IsPrimaryKey(PropertyId name) const = 0;
virtual bool IsPrimaryKey(LabelId primary_label, PropertyId property) const = 0;
};
// TODO(kostasrim)rename this class template
@ -186,7 +190,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
for (const auto &[label, space] : shards_map_.label_spaces) {
for (const auto &[key, shard] : space.shards) {
auto &storage_client = GetStorageClientForShard(shard, label);
auto &storage_client = GetStorageClientForShard(shard);
// TODO(kostasrim) Currently requests return the result directly. Adjust this when the API works MgFuture
// instead.
auto commit_response = storage_client.SendWriteRequest(commit_req);
@ -204,15 +208,15 @@ class ShardRequestManager : public ShardRequestManagerInterface {
}
}
storage::v3::EdgeTypeId NameToEdgeType(const std::string & /*name*/) const override {
return memgraph::storage::v3::EdgeTypeId::FromUint(0);
storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const override {
return *shards_map_.GetEdgeTypeId(name);
}
storage::v3::PropertyId NameToProperty(const std::string &name) const override {
return *shards_map_.GetPropertyId(name);
}
memgraph::storage::v3::LabelId LabelNameToLabelId(const std::string &name) const override {
storage::v3::LabelId LabelNameToLabelId(const std::string &name) const override {
return shards_map_.GetLabelId(name);
}
@ -229,9 +233,13 @@ class ShardRequestManager : public ShardRequestManagerInterface {
return str;
}
bool IsPrimaryKey(const PropertyId name) const override {
return std::find_if(shards_map_.properties.begin(), shards_map_.properties.end(),
[name](auto &pr) { return pr.second == name; }) != shards_map_.properties.end();
bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override {
const auto schema_it = shards_map_.schemas.find(primary_label);
MG_ASSERT(schema_it != shards_map_.schemas.end(), "Invalid primary label id: {}", primary_label.AsUint());
return std::find_if(schema_it->second.begin(), schema_it->second.end(), [property](const auto &schema_prop) {
return schema_prop.property_id == property;
}) != schema_it->second.end();
}
// TODO(kostasrim) Simplify return result
@ -281,6 +289,34 @@ class ShardRequestManager : public ShardRequestManagerInterface {
return responses;
}
std::vector<CreateExpandResponse> Request(ExecutionState<CreateExpandRequest> &state,
std::vector<NewExpand> new_edges) override {
MG_ASSERT(!new_edges.empty());
MaybeInitializeExecutionState(state, new_edges);
std::vector<CreateExpandResponse> responses;
auto &shard_cache_ref = state.shard_cache;
size_t id{0};
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
auto &storage_client = GetStorageClientForShard(*shard_it);
WriteRequests req = state.requests[id];
auto write_response_result = storage_client.SendWriteRequest(std::move(req));
if (write_response_result.HasError()) {
throw std::runtime_error("CreateVertices request timedout");
}
WriteResponses response_variant = write_response_result.GetValue();
CreateExpandResponse mapped_response = std::get<CreateExpandResponse>(response_variant);
if (!mapped_response.success) {
throw std::runtime_error("CreateExpand request did not succeed");
}
responses.push_back(mapped_response);
shard_it = shard_cache_ref.erase(shard_it);
}
// We are done with this state
MaybeCompleteState(state);
return responses;
}
std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) override {
// TODO(kostasrim)Update to limit the batch size here
// Expansions of the destination must be handled by the caller. For example
@ -363,6 +399,44 @@ class ShardRequestManager : public ShardRequestManagerInterface {
state.state = ExecutionState<CreateVerticesRequest>::EXECUTING;
}
void MaybeInitializeExecutionState(ExecutionState<CreateExpandRequest> &state, std::vector<NewExpand> new_expands) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
}
state.transaction_id = transaction_id_;
std::map<Shard, CreateExpandRequest> per_shard_request_table;
auto ensure_shard_exists_in_table = [&per_shard_request_table,
transaction_id = transaction_id_](const Shard &shard) {
if (!per_shard_request_table.contains(shard)) {
CreateExpandRequest create_expand_request{.transaction_id = transaction_id};
per_shard_request_table.insert({shard, std::move(create_expand_request)});
}
};
for (auto &new_expand : new_expands) {
const auto shard_src_vertex = shards_map_.GetShardForKey(
new_expand.src_vertex.first.id, storage::conversions::ConvertPropertyVector(new_expand.src_vertex.second));
const auto shard_dest_vertex = shards_map_.GetShardForKey(
new_expand.dest_vertex.first.id, storage::conversions::ConvertPropertyVector(new_expand.dest_vertex.second));
ensure_shard_exists_in_table(shard_src_vertex);
if (shard_src_vertex != shard_dest_vertex) {
ensure_shard_exists_in_table(shard_dest_vertex);
per_shard_request_table[shard_dest_vertex].new_expands.push_back(new_expand);
}
per_shard_request_table[shard_src_vertex].new_expands.push_back(std::move(new_expand));
}
for (auto &[shard, request] : per_shard_request_table) {
state.shard_cache.push_back(shard);
state.requests.push_back(std::move(request));
}
state.state = ExecutionState<CreateExpandRequest>::EXECUTING;
}
void MaybeInitializeExecutionState(ExecutionState<ScanVerticesRequest> &state) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
@ -415,20 +489,19 @@ class ShardRequestManager : public ShardRequestManagerInterface {
state.state = ExecutionState<ExpandOneRequest>::EXECUTING;
}
StorageClient &GetStorageClientForShard(Shard shard, LabelId label_id) {
if (!storage_cli_manager_.Exists(label_id, shard)) {
AddStorageClientToManager(shard, label_id);
StorageClient &GetStorageClientForShard(Shard shard) {
if (!storage_cli_manager_.Exists(shard)) {
AddStorageClientToManager(shard);
}
return storage_cli_manager_.GetClient(label_id, shard);
return storage_cli_manager_.GetClient(shard);
}
StorageClient &GetStorageClientForShard(const std::string &label, const CompoundKey &key) {
auto shard = shards_map_.GetShardForKey(label, key);
auto label_id = shards_map_.GetLabelId(label);
return GetStorageClientForShard(std::move(shard), label_id);
return GetStorageClientForShard(std::move(shard));
}
void AddStorageClientToManager(Shard target_shard, const LabelId &label_id) {
void AddStorageClientToManager(Shard target_shard) {
MG_ASSERT(!target_shard.empty());
auto leader_addr = target_shard.front();
std::vector<Address> addresses;
@ -437,7 +510,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
addresses.push_back(std::move(address.address));
}
auto cli = StorageClient(io_, std::move(leader_addr.address), std::move(addresses));
storage_cli_manager_.AddClient(label_id, target_shard, std::move(cli));
storage_cli_manager_.AddClient(target_shard, std::move(cli));
}
void SendAllRequests(ExecutionState<ScanVerticesRequest> &state) {
@ -461,7 +534,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
new_vertex.label_ids.erase(new_vertex.label_ids.begin());
}
auto &storage_client = GetStorageClientForShard(*shard_it, labels[0].id);
auto &storage_client = GetStorageClientForShard(*shard_it);
WriteRequests req = req_deep_copy;
storage_client.SendAsyncWriteRequest(req);
@ -473,8 +546,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
std::vector<memgraph::coordinator::Shard> &shard_cache_ref) {
size_t id = 0;
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
const Label primary_label = state.requests[id].src_vertices[0].first;
auto &storage_client = GetStorageClientForShard(*shard_it, primary_label.id);
auto &storage_client = GetStorageClientForShard(*shard_it);
ReadRequests req = state.requests[id];
storage_client.SendAsyncReadRequest(req);
}
@ -488,7 +560,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
// This is fine because all new_vertices of each request end up on the same shard
const auto labels = state.requests[request_idx].new_vertices[0].label_ids;
auto &storage_client = GetStorageClientForShard(*shard_it, labels[0].id);
auto &storage_client = GetStorageClientForShard(*shard_it);
auto poll_result = storage_client.AwaitAsyncWriteRequest();
if (!poll_result) {

View File

@ -83,7 +83,7 @@ class DbAccessor final {
}
storage::v3::ResultSchema<VertexAccessor> InsertVertexAndValidate(
const storage::v3::LabelId primary_label, const std::vector<storage::v3::LabelId> &labels,
storage::v3::LabelId primary_label, const std::vector<storage::v3::LabelId> &labels,
const std::vector<std::pair<storage::v3::PropertyId, storage::v3::PropertyValue>> &properties) {
auto maybe_vertex_acc = accessor_->CreateVertexAndValidate(primary_label, labels, properties);
if (maybe_vertex_acc.HasError()) {

View File

@ -564,38 +564,54 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteVerticesRequest &&req) {
return memgraph::msgs::DeleteVerticesResponse{.success = action_successful};
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateEdgesRequest &&req) {
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateExpandRequest &&req) {
auto acc = shard_->Access(req.transaction_id);
bool action_successful = true;
for (auto &edge : req.edges) {
auto vertex_acc_from_primary_key = edge.src.second;
for (auto &new_expand : req.new_expands) {
auto vertex_acc_from_primary_key = new_expand.src_vertex.second;
auto vertex_from_acc = acc.FindVertex(ConvertPropertyVector(std::move(vertex_acc_from_primary_key)), View::OLD);
auto vertex_acc_to_primary_key = edge.dst.second;
auto vertex_acc_to_primary_key = new_expand.dest_vertex.second;
auto vertex_to_acc = acc.FindVertex(ConvertPropertyVector(std::move(vertex_acc_to_primary_key)), View::OLD);
if (!vertex_from_acc || !vertex_to_acc) {
if (!(vertex_from_acc || vertex_to_acc)) {
action_successful = false;
spdlog::debug("Error while trying to insert edge, vertex does not exist. Transaction id: {}",
req.transaction_id.logical_id);
break;
}
auto from_vertex_id = VertexId(edge.src.first.id, ConvertPropertyVector(std::move(edge.src.second)));
auto to_vertex_id = VertexId(edge.dst.first.id, ConvertPropertyVector(std::move(edge.dst.second)));
auto edge_acc =
acc.CreateEdge(from_vertex_id, to_vertex_id, EdgeTypeId::FromUint(edge.type.id), Gid::FromUint(edge.id.gid));
if (edge_acc.HasError()) {
auto from_vertex_id =
VertexId(new_expand.src_vertex.first.id, ConvertPropertyVector(std::move(new_expand.src_vertex.second)));
auto to_vertex_id =
VertexId(new_expand.dest_vertex.first.id, ConvertPropertyVector(std::move(new_expand.dest_vertex.second)));
auto edge_acc = acc.CreateEdge(from_vertex_id, to_vertex_id, EdgeTypeId::FromUint(new_expand.type.id),
Gid::FromUint(new_expand.id.gid));
if (edge_acc.HasValue()) {
auto edge = edge_acc.GetValue();
if (!new_expand.properties.empty()) {
for (const auto &[property, value] : new_expand.properties) {
if (const auto maybe_error = edge.SetProperty(property, ToPropertyValue(value)); maybe_error.HasError()) {
action_successful = false;
spdlog::debug("Setting edge property was not successful. Transaction id: {}",
req.transaction_id.logical_id);
break;
}
if (!action_successful) {
break;
}
}
}
} else {
action_successful = false;
spdlog::debug("Creating edge was not successful. Transaction id: {}", req.transaction_id.logical_id);
break;
}
// Add properties to the edge if there is any
if (edge.properties) {
for (auto &[edge_prop_key, edge_prop_val] : edge.properties.value()) {
if (!new_expand.properties.empty()) {
for (auto &[edge_prop_key, edge_prop_val] : new_expand.properties) {
auto set_result = edge_acc->SetProperty(edge_prop_key, ToPropertyValue(std::move(edge_prop_val)));
if (set_result.HasError()) {
action_successful = false;
@ -607,7 +623,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateEdgesRequest &&req) {
}
}
return memgraph::msgs::CreateEdgesResponse{.success = action_successful};
return memgraph::msgs::CreateExpandResponse{.success = action_successful};
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteEdgesRequest &&req) {

View File

@ -35,7 +35,7 @@ class ShardRsm {
msgs::WriteResponses ApplyWrite(msgs::DeleteVerticesRequest &&req);
msgs::WriteResponses ApplyWrite(msgs::UpdateVerticesRequest &&req);
msgs::WriteResponses ApplyWrite(msgs::CreateEdgesRequest &&req);
msgs::WriteResponses ApplyWrite(msgs::CreateExpandRequest &&req);
msgs::WriteResponses ApplyWrite(msgs::DeleteEdgesRequest &&req);
msgs::WriteResponses ApplyWrite(msgs::UpdateEdgesRequest &&req);

View File

@ -191,6 +191,26 @@ void TestCreateVertices(ShardRequestManager &io) {
MG_ASSERT(result.size() == 2);
}
template <typename ShardRequestManager>
void TestCreateExpand(ShardRequestManager &io) {
using PropVal = memgraph::msgs::Value;
memgraph::msgs::ExecutionState<memgraph::msgs::CreateExpandRequest> state;
std::vector<memgraph::msgs::NewExpand> new_expands;
const auto edge_type_id = io.NameToEdgeType("edge_type");
const auto label_id = io.LabelNameToLabelId("test_label");
const VertexId vertex_id_1{label_id, {PropVal(int64_t(1)), PropVal(int64_t(0))}};
const VertexId vertex_id_2{label_id, {PropVal(int64_t(13)), PropVal(int64_t(13))}};
memgraph::msgs::NewExpand expand_1{
.id = 0, .type = edge_type_id, .src_vertex = vertex_id_1, .dest_vertex = vertex_id_2};
memgraph::msgs::NewExpand expand_2{
.id = 1, .type = edge_type_id, .src_vertex = vertex_id_2, .dest_vertex = vertex_id_1};
new_expands.push_back(std::move(expand_1));
auto result = io.Request(state, std::move(new_expands));
MG_ASSERT(result.size() == 2);
}
template <typename ShardRequestManager>
void TestExpand(ShardRequestManager &io) {}

View File

@ -221,15 +221,14 @@ bool AttemptToAddEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t va
auto type = msgs::EdgeType{};
type.id = edge_type_id;
auto edge = msgs::Edge{};
msgs::NewExpand edge;
edge.id = id;
edge.type = type;
edge.src = src;
edge.dst = dst;
edge.properties = std::nullopt;
edge.src_vertex = src;
edge.dest_vertex = dst;
msgs::CreateEdgesRequest create_req{};
create_req.edges = {edge};
msgs::CreateExpandRequest create_req{};
create_req.new_expands = {edge};
create_req.transaction_id.logical_id = GetTransactionId();
while (true) {
@ -239,18 +238,19 @@ bool AttemptToAddEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t va
}
auto write_response_result = write_res.GetValue();
auto write_response = std::get<msgs::CreateEdgesResponse>(write_response_result);
auto write_response = std::get<msgs::CreateExpandResponse>(write_response_result);
Commit(client, create_req.transaction_id);
return write_response.success;
}
return true;
}
bool AttemptToAddEdgeWithProperties(ShardClient &client, int64_t value_of_vertex_1, int64_t value_of_vertex_2,
int64_t edge_gid, uint64_t edge_prop_id, int64_t edge_prop_val,
const std::vector<uint64_t> &edge_type_id) {
auto id1 = msgs::EdgeId{};
msgs::EdgeId id1;
msgs::Label label = {.id = get_primary_label()};
auto src = std::make_pair(label, GetPrimaryKey(value_of_vertex_1));
@ -262,19 +262,19 @@ bool AttemptToAddEdgeWithProperties(ShardClient &client, int64_t value_of_vertex
auto edge_prop = std::make_pair(PropertyId::FromUint(edge_prop_id), msgs::Value(edge_prop_val));
auto edge = msgs::Edge{};
edge.id = id1;
edge.type = type1;
edge.src = src;
edge.dst = dst;
edge.properties = {edge_prop};
auto expand = msgs::NewExpand{};
expand.id = id1;
expand.type = type1;
expand.src_vertex = src;
expand.dest_vertex = dst;
expand.properties = {edge_prop};
msgs::CreateEdgesRequest create_req{};
create_req.edges = {edge};
msgs::CreateExpandRequest create_req{};
create_req.new_expands = {expand};
create_req.transaction_id.logical_id = GetTransactionId();
auto write_res = client.SendWriteRequest(create_req);
MG_ASSERT(write_res.HasValue() && std::get<msgs::CreateEdgesResponse>(write_res.GetValue()).success,
MG_ASSERT(write_res.HasValue() && std::get<msgs::CreateExpandResponse>(write_res.GetValue()).success,
"Unexpected failure");
Commit(client, create_req.transaction_id);