Merge branch 'master' into Implement-constant-time-label-and-edge-type-retrieval
This commit is contained in:
commit
997779fe07
@ -370,8 +370,9 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
constexpr bool kSharedAccess = false;
|
||||
|
||||
std::optional<std::pair<uint64_t, storage::InMemoryStorage::ReplicationAccessor>> commit_timestamp_and_accessor;
|
||||
auto get_transaction = [storage, &commit_timestamp_and_accessor](uint64_t commit_timestamp,
|
||||
bool unique = kSharedAccess) {
|
||||
auto const get_transaction = [storage, &commit_timestamp_and_accessor](
|
||||
uint64_t commit_timestamp,
|
||||
bool unique = kSharedAccess) -> storage::InMemoryStorage::ReplicationAccessor * {
|
||||
if (!commit_timestamp_and_accessor) {
|
||||
std::unique_ptr<storage::Storage::Accessor> acc = nullptr;
|
||||
if (unique) {
|
||||
@ -415,9 +416,11 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
spdlog::trace(" Delete vertex {}", delta.vertex_create_delete.gid.AsUint());
|
||||
auto *transaction = get_transaction(timestamp);
|
||||
auto vertex = transaction->FindVertex(delta.vertex_create_delete.gid, View::NEW);
|
||||
if (!vertex) throw utils::BasicException("Invalid transaction!");
|
||||
if (!vertex)
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
auto ret = transaction->DeleteVertex(&*vertex);
|
||||
if (ret.HasError() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!");
|
||||
if (ret.HasError() || !ret.GetValue())
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::VERTEX_ADD_LABEL: {
|
||||
@ -425,9 +428,11 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
delta.vertex_add_remove_label.label);
|
||||
auto *transaction = get_transaction(timestamp);
|
||||
auto vertex = transaction->FindVertex(delta.vertex_add_remove_label.gid, View::NEW);
|
||||
if (!vertex) throw utils::BasicException("Invalid transaction!");
|
||||
if (!vertex)
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
auto ret = vertex->AddLabel(transaction->NameToLabel(delta.vertex_add_remove_label.label));
|
||||
if (ret.HasError() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!");
|
||||
if (ret.HasError() || !ret.GetValue())
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::VERTEX_REMOVE_LABEL: {
|
||||
@ -435,9 +440,11 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
delta.vertex_add_remove_label.label);
|
||||
auto *transaction = get_transaction(timestamp);
|
||||
auto vertex = transaction->FindVertex(delta.vertex_add_remove_label.gid, View::NEW);
|
||||
if (!vertex) throw utils::BasicException("Invalid transaction!");
|
||||
if (!vertex)
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
auto ret = vertex->RemoveLabel(transaction->NameToLabel(delta.vertex_add_remove_label.label));
|
||||
if (ret.HasError() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!");
|
||||
if (ret.HasError() || !ret.GetValue())
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::VERTEX_SET_PROPERTY: {
|
||||
@ -445,10 +452,12 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
delta.vertex_edge_set_property.property, delta.vertex_edge_set_property.value);
|
||||
auto *transaction = get_transaction(timestamp);
|
||||
auto vertex = transaction->FindVertex(delta.vertex_edge_set_property.gid, View::NEW);
|
||||
if (!vertex) throw utils::BasicException("Invalid transaction!");
|
||||
if (!vertex)
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
auto ret = vertex->SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property),
|
||||
delta.vertex_edge_set_property.value);
|
||||
if (ret.HasError()) throw utils::BasicException("Invalid transaction!");
|
||||
if (ret.HasError())
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::EDGE_CREATE: {
|
||||
@ -457,13 +466,16 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
delta.edge_create_delete.from_vertex.AsUint(), delta.edge_create_delete.to_vertex.AsUint());
|
||||
auto *transaction = get_transaction(timestamp);
|
||||
auto from_vertex = transaction->FindVertex(delta.edge_create_delete.from_vertex, View::NEW);
|
||||
if (!from_vertex) throw utils::BasicException("Invalid transaction!");
|
||||
if (!from_vertex)
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
auto to_vertex = transaction->FindVertex(delta.edge_create_delete.to_vertex, View::NEW);
|
||||
if (!to_vertex) throw utils::BasicException("Invalid transaction!");
|
||||
if (!to_vertex)
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
auto edge = transaction->CreateEdgeEx(&*from_vertex, &*to_vertex,
|
||||
transaction->NameToEdgeType(delta.edge_create_delete.edge_type),
|
||||
delta.edge_create_delete.gid);
|
||||
if (edge.HasError()) throw utils::BasicException("Invalid transaction!");
|
||||
if (edge.HasError())
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::EDGE_DELETE: {
|
||||
@ -472,16 +484,17 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
delta.edge_create_delete.from_vertex.AsUint(), delta.edge_create_delete.to_vertex.AsUint());
|
||||
auto *transaction = get_transaction(timestamp);
|
||||
auto from_vertex = transaction->FindVertex(delta.edge_create_delete.from_vertex, View::NEW);
|
||||
if (!from_vertex) throw utils::BasicException("Invalid transaction!");
|
||||
if (!from_vertex)
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
auto to_vertex = transaction->FindVertex(delta.edge_create_delete.to_vertex, View::NEW);
|
||||
if (!to_vertex) throw utils::BasicException("Invalid transaction!");
|
||||
auto edges = from_vertex->OutEdges(View::NEW, {transaction->NameToEdgeType(delta.edge_create_delete.edge_type)},
|
||||
&*to_vertex);
|
||||
if (edges.HasError()) throw utils::BasicException("Invalid transaction!");
|
||||
if (edges->edges.size() != 1) throw utils::BasicException("Invalid transaction!");
|
||||
auto &edge = (*edges).edges[0];
|
||||
auto ret = transaction->DeleteEdge(&edge);
|
||||
if (ret.HasError()) throw utils::BasicException("Invalid transaction!");
|
||||
if (!to_vertex)
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
auto edgeType = transaction->NameToEdgeType(delta.edge_create_delete.edge_type);
|
||||
auto edge =
|
||||
transaction->FindEdge(delta.edge_create_delete.gid, View::NEW, edgeType, &*from_vertex, &*to_vertex);
|
||||
if (!edge) throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
if (auto ret = transaction->DeleteEdge(&*edge); ret.HasError())
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::EDGE_SET_PROPERTY: {
|
||||
@ -498,7 +511,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
// yields an accessor that is only valid for managing the edge's
|
||||
// properties.
|
||||
auto edge = edge_acc.find(delta.vertex_edge_set_property.gid);
|
||||
if (edge == edge_acc.end()) throw utils::BasicException("Invalid transaction!");
|
||||
if (edge == edge_acc.end())
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
// The edge visibility check must be done here manually because we
|
||||
// don't allow direct access to the edges through the public API.
|
||||
{
|
||||
@ -530,7 +544,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
}
|
||||
}
|
||||
});
|
||||
if (!is_visible) throw utils::BasicException("Invalid transaction!");
|
||||
if (!is_visible)
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
}
|
||||
EdgeRef edge_ref(&*edge);
|
||||
// Here we create an edge accessor that we will use to get the
|
||||
@ -543,7 +558,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
|
||||
auto ret = ea.SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property),
|
||||
delta.vertex_edge_set_property.value);
|
||||
if (ret.HasError()) throw utils::BasicException("Invalid transaction!");
|
||||
if (ret.HasError())
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -553,7 +569,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
throw utils::BasicException("Invalid commit data!");
|
||||
auto ret =
|
||||
commit_timestamp_and_accessor->second.Commit(commit_timestamp_and_accessor->first, false /* not main */);
|
||||
if (ret.HasError()) throw utils::BasicException("Invalid transaction!");
|
||||
if (ret.HasError())
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
commit_timestamp_and_accessor = std::nullopt;
|
||||
break;
|
||||
}
|
||||
@ -563,14 +580,14 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
// Need to send the timestamp
|
||||
auto *transaction = get_transaction(timestamp, kUniqueAccess);
|
||||
if (transaction->CreateIndex(storage->NameToLabel(delta.operation_label.label)).HasError())
|
||||
throw utils::BasicException("Invalid transaction!");
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::LABEL_INDEX_DROP: {
|
||||
spdlog::trace(" Drop label index on :{}", delta.operation_label.label);
|
||||
auto *transaction = get_transaction(timestamp, kUniqueAccess);
|
||||
if (transaction->DropIndex(storage->NameToLabel(delta.operation_label.label)).HasError())
|
||||
throw utils::BasicException("Invalid transaction!");
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::LABEL_INDEX_STATS_SET: {
|
||||
@ -601,7 +618,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
->CreateIndex(storage->NameToLabel(delta.operation_label_property.label),
|
||||
storage->NameToProperty(delta.operation_label_property.property))
|
||||
.HasError())
|
||||
throw utils::BasicException("Invalid transaction!");
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: {
|
||||
@ -612,7 +629,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
->DropIndex(storage->NameToLabel(delta.operation_label_property.label),
|
||||
storage->NameToProperty(delta.operation_label_property.property))
|
||||
.HasError())
|
||||
throw utils::BasicException("Invalid transaction!");
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_STATS_SET: {
|
||||
@ -644,7 +661,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
auto ret =
|
||||
transaction->CreateExistenceConstraint(storage->NameToLabel(delta.operation_label_property.label),
|
||||
storage->NameToProperty(delta.operation_label_property.property));
|
||||
if (ret.HasError()) throw utils::BasicException("Invalid transaction!");
|
||||
if (ret.HasError())
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: {
|
||||
@ -655,7 +673,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
->DropExistenceConstraint(storage->NameToLabel(delta.operation_label_property.label),
|
||||
storage->NameToProperty(delta.operation_label_property.property))
|
||||
.HasError())
|
||||
throw utils::BasicException("Invalid transaction!");
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE: {
|
||||
@ -670,7 +688,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
auto ret = transaction->CreateUniqueConstraint(storage->NameToLabel(delta.operation_label_properties.label),
|
||||
properties);
|
||||
if (!ret.HasValue() || ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS)
|
||||
throw utils::BasicException("Invalid transaction!");
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP: {
|
||||
@ -685,7 +703,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage
|
||||
auto ret =
|
||||
transaction->DropUniqueConstraint(storage->NameToLabel(delta.operation_label_properties.label), properties);
|
||||
if (ret != UniqueConstraints::DeletionStatus::SUCCESS) {
|
||||
throw utils::BasicException("Invalid transaction!");
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -10,6 +10,7 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <optional>
|
||||
#include <span>
|
||||
|
@ -1818,6 +1818,10 @@ class EdgeAtom : public memgraph::query::PatternAtom {
|
||||
memgraph::query::Identifier *inner_edge{nullptr};
|
||||
/// Argument identifier for the destination node of the edge.
|
||||
memgraph::query::Identifier *inner_node{nullptr};
|
||||
/// Argument identifier for the currently-accumulated path.
|
||||
memgraph::query::Identifier *accumulated_path{nullptr};
|
||||
/// Argument identifier for the weight of the currently-accumulated path.
|
||||
memgraph::query::Identifier *accumulated_weight{nullptr};
|
||||
/// Evaluates the result of the lambda.
|
||||
memgraph::query::Expression *expression{nullptr};
|
||||
|
||||
@ -1825,6 +1829,8 @@ class EdgeAtom : public memgraph::query::PatternAtom {
|
||||
Lambda object;
|
||||
object.inner_edge = inner_edge ? inner_edge->Clone(storage) : nullptr;
|
||||
object.inner_node = inner_node ? inner_node->Clone(storage) : nullptr;
|
||||
object.accumulated_path = accumulated_path ? accumulated_path->Clone(storage) : nullptr;
|
||||
object.accumulated_weight = accumulated_weight ? accumulated_weight->Clone(storage) : nullptr;
|
||||
object.expression = expression ? expression->Clone(storage) : nullptr;
|
||||
return object;
|
||||
}
|
||||
|
@ -1986,6 +1986,15 @@ antlrcpp::Any CypherMainVisitor::visitRelationshipPattern(MemgraphCypher::Relati
|
||||
edge_lambda.inner_edge = storage_->Create<Identifier>(traversed_edge_variable);
|
||||
auto traversed_node_variable = std::any_cast<std::string>(lambda->traversed_node->accept(this));
|
||||
edge_lambda.inner_node = storage_->Create<Identifier>(traversed_node_variable);
|
||||
if (lambda->accumulated_path) {
|
||||
auto accumulated_path_variable = std::any_cast<std::string>(lambda->accumulated_path->accept(this));
|
||||
edge_lambda.accumulated_path = storage_->Create<Identifier>(accumulated_path_variable);
|
||||
|
||||
if (lambda->accumulated_weight) {
|
||||
auto accumulated_weight_variable = std::any_cast<std::string>(lambda->accumulated_weight->accept(this));
|
||||
edge_lambda.accumulated_weight = storage_->Create<Identifier>(accumulated_weight_variable);
|
||||
}
|
||||
}
|
||||
edge_lambda.expression = std::any_cast<Expression *>(lambda->expression()->accept(this));
|
||||
return edge_lambda;
|
||||
};
|
||||
@ -2010,6 +2019,15 @@ antlrcpp::Any CypherMainVisitor::visitRelationshipPattern(MemgraphCypher::Relati
|
||||
// In variable expansion inner variables are mandatory.
|
||||
anonymous_identifiers.push_back(&edge->filter_lambda_.inner_edge);
|
||||
anonymous_identifiers.push_back(&edge->filter_lambda_.inner_node);
|
||||
|
||||
// TODO: In what use case do we need accumulated path and weight here?
|
||||
if (edge->filter_lambda_.accumulated_path) {
|
||||
anonymous_identifiers.push_back(&edge->filter_lambda_.accumulated_path);
|
||||
|
||||
if (edge->filter_lambda_.accumulated_weight) {
|
||||
anonymous_identifiers.push_back(&edge->filter_lambda_.accumulated_weight);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case 1:
|
||||
if (edge->type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH ||
|
||||
@ -2021,9 +2039,21 @@ antlrcpp::Any CypherMainVisitor::visitRelationshipPattern(MemgraphCypher::Relati
|
||||
// Add mandatory inner variables for filter lambda.
|
||||
anonymous_identifiers.push_back(&edge->filter_lambda_.inner_edge);
|
||||
anonymous_identifiers.push_back(&edge->filter_lambda_.inner_node);
|
||||
if (edge->filter_lambda_.accumulated_path) {
|
||||
anonymous_identifiers.push_back(&edge->filter_lambda_.accumulated_path);
|
||||
|
||||
if (edge->filter_lambda_.accumulated_weight) {
|
||||
anonymous_identifiers.push_back(&edge->filter_lambda_.accumulated_weight);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Other variable expands only have the filter lambda.
|
||||
edge->filter_lambda_ = visit_lambda(relationshipLambdas[0]);
|
||||
if (edge->filter_lambda_.accumulated_weight) {
|
||||
throw SemanticException(
|
||||
"Accumulated weight in filter lambda can be used only with "
|
||||
"shortest paths expansion.");
|
||||
}
|
||||
}
|
||||
break;
|
||||
case 2:
|
||||
|
@ -179,7 +179,7 @@ relationshipDetail : '[' ( name=variable )? ( relationshipTypes )? ( variableExp
|
||||
| '[' ( name=variable )? ( relationshipTypes )? ( variableExpansion )? relationshipLambda ( total_weight=variable )? (relationshipLambda )? ']'
|
||||
| '[' ( name=variable )? ( relationshipTypes )? ( variableExpansion )? (properties )* ( relationshipLambda total_weight=variable )? (relationshipLambda )? ']';
|
||||
|
||||
relationshipLambda: '(' traversed_edge=variable ',' traversed_node=variable '|' expression ')';
|
||||
relationshipLambda: '(' traversed_edge=variable ',' traversed_node=variable ( ',' accumulated_path=variable )? ( ',' accumulated_weight=variable )? '|' expression ')';
|
||||
|
||||
variableExpansion : '*' (BFS | WSHORTEST | ALLSHORTEST)? ( expression )? ( '..' ( expression )? )? ;
|
||||
|
||||
|
@ -658,8 +658,16 @@ bool SymbolGenerator::PreVisit(EdgeAtom &edge_atom) {
|
||||
scope.in_edge_range = false;
|
||||
scope.in_pattern = false;
|
||||
if (edge_atom.filter_lambda_.expression) {
|
||||
VisitWithIdentifiers(edge_atom.filter_lambda_.expression,
|
||||
{edge_atom.filter_lambda_.inner_edge, edge_atom.filter_lambda_.inner_node});
|
||||
std::vector<Identifier *> filter_lambda_identifiers{edge_atom.filter_lambda_.inner_edge,
|
||||
edge_atom.filter_lambda_.inner_node};
|
||||
if (edge_atom.filter_lambda_.accumulated_path) {
|
||||
filter_lambda_identifiers.emplace_back(edge_atom.filter_lambda_.accumulated_path);
|
||||
|
||||
if (edge_atom.filter_lambda_.accumulated_weight) {
|
||||
filter_lambda_identifiers.emplace_back(edge_atom.filter_lambda_.accumulated_weight);
|
||||
}
|
||||
}
|
||||
VisitWithIdentifiers(edge_atom.filter_lambda_.expression, filter_lambda_identifiers);
|
||||
} else {
|
||||
// Create inner symbols, but don't bind them in scope, since they are to
|
||||
// be used in the missing filter expression.
|
||||
@ -668,6 +676,17 @@ bool SymbolGenerator::PreVisit(EdgeAtom &edge_atom) {
|
||||
auto *inner_node = edge_atom.filter_lambda_.inner_node;
|
||||
inner_node->MapTo(
|
||||
symbol_table_->CreateSymbol(inner_node->name_, inner_node->user_declared_, Symbol::Type::VERTEX));
|
||||
if (edge_atom.filter_lambda_.accumulated_path) {
|
||||
auto *accumulated_path = edge_atom.filter_lambda_.accumulated_path;
|
||||
accumulated_path->MapTo(
|
||||
symbol_table_->CreateSymbol(accumulated_path->name_, accumulated_path->user_declared_, Symbol::Type::PATH));
|
||||
|
||||
if (edge_atom.filter_lambda_.accumulated_weight) {
|
||||
auto *accumulated_weight = edge_atom.filter_lambda_.accumulated_weight;
|
||||
accumulated_weight->MapTo(symbol_table_->CreateSymbol(
|
||||
accumulated_weight->name_, accumulated_weight->user_declared_, Symbol::Type::NUMBER));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (edge_atom.weight_lambda_.expression) {
|
||||
VisitWithIdentifiers(edge_atom.weight_lambda_.expression,
|
||||
|
@ -1138,6 +1138,11 @@ class ExpandVariableCursor : public Cursor {
|
||||
edges_it_.emplace_back(edges_.back().begin());
|
||||
}
|
||||
|
||||
if (self_.filter_lambda_.accumulated_path_symbol) {
|
||||
// Add initial vertex of path to the accumulated path
|
||||
frame[self_.filter_lambda_.accumulated_path_symbol.value()] = Path(vertex);
|
||||
}
|
||||
|
||||
// reset the frame value to an empty edge list
|
||||
auto *pull_memory = context.evaluation_context.memory;
|
||||
frame[self_.common_.edge_symbol] = TypedValue::TVector(pull_memory);
|
||||
@ -1234,6 +1239,13 @@ class ExpandVariableCursor : public Cursor {
|
||||
// Skip expanding out of filtered expansion.
|
||||
frame[self_.filter_lambda_.inner_edge_symbol] = current_edge.first;
|
||||
frame[self_.filter_lambda_.inner_node_symbol] = current_vertex;
|
||||
if (self_.filter_lambda_.accumulated_path_symbol) {
|
||||
MG_ASSERT(frame[self_.filter_lambda_.accumulated_path_symbol.value()].IsPath(),
|
||||
"Accumulated path must be path");
|
||||
Path &accumulated_path = frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath();
|
||||
accumulated_path.Expand(current_edge.first);
|
||||
accumulated_path.Expand(current_vertex);
|
||||
}
|
||||
if (self_.filter_lambda_.expression && !EvaluateFilter(evaluator, self_.filter_lambda_.expression)) continue;
|
||||
|
||||
// we are doing depth-first search, so place the current
|
||||
@ -1546,6 +1558,13 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
||||
#endif
|
||||
frame[self_.filter_lambda_.inner_edge_symbol] = edge;
|
||||
frame[self_.filter_lambda_.inner_node_symbol] = vertex;
|
||||
if (self_.filter_lambda_.accumulated_path_symbol) {
|
||||
MG_ASSERT(frame[self_.filter_lambda_.accumulated_path_symbol.value()].IsPath(),
|
||||
"Accumulated path must have Path type");
|
||||
Path &accumulated_path = frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath();
|
||||
accumulated_path.Expand(edge);
|
||||
accumulated_path.Expand(vertex);
|
||||
}
|
||||
|
||||
if (self_.filter_lambda_.expression) {
|
||||
TypedValue result = self_.filter_lambda_.expression->Accept(evaluator);
|
||||
@ -1607,6 +1626,11 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
||||
const auto &vertex = vertex_value.ValueVertex();
|
||||
processed_.emplace(vertex, std::nullopt);
|
||||
|
||||
if (self_.filter_lambda_.accumulated_path_symbol) {
|
||||
// Add initial vertex of path to the accumulated path
|
||||
frame[self_.filter_lambda_.accumulated_path_symbol.value()] = Path(vertex);
|
||||
}
|
||||
|
||||
expand_from_vertex(vertex);
|
||||
|
||||
// go back to loop start and see if we expanded anything
|
||||
@ -1677,6 +1701,10 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
||||
namespace {
|
||||
|
||||
void CheckWeightType(TypedValue current_weight, utils::MemoryResource *memory) {
|
||||
if (current_weight.IsNull()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!current_weight.IsNumeric() && !current_weight.IsDuration()) {
|
||||
throw QueryRuntimeException("Calculated weight must be numeric or a Duration, got {}.", current_weight.type());
|
||||
}
|
||||
@ -1694,6 +1722,34 @@ void CheckWeightType(TypedValue current_weight, utils::MemoryResource *memory) {
|
||||
}
|
||||
}
|
||||
|
||||
void ValidateWeightTypes(const TypedValue &lhs, const TypedValue &rhs) {
|
||||
if ((lhs.IsNumeric() && rhs.IsNumeric()) || (lhs.IsDuration() && rhs.IsDuration())) {
|
||||
return;
|
||||
}
|
||||
throw QueryRuntimeException(utils::MessageWithLink(
|
||||
"All weights should be of the same type, either numeric or a Duration. Please update the weight "
|
||||
"expression or the filter expression.",
|
||||
"https://memgr.ph/wsp"));
|
||||
}
|
||||
|
||||
TypedValue CalculateNextWeight(const std::optional<memgraph::query::plan::ExpansionLambda> &weight_lambda,
|
||||
const TypedValue &total_weight, ExpressionEvaluator evaluator) {
|
||||
if (!weight_lambda) {
|
||||
return {};
|
||||
}
|
||||
auto *memory = evaluator.GetMemoryResource();
|
||||
TypedValue current_weight = weight_lambda->expression->Accept(evaluator);
|
||||
CheckWeightType(current_weight, memory);
|
||||
|
||||
if (total_weight.IsNull()) {
|
||||
return current_weight;
|
||||
}
|
||||
|
||||
ValidateWeightTypes(current_weight, total_weight);
|
||||
|
||||
return TypedValue(current_weight, memory) + total_weight;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
||||
@ -1722,7 +1778,6 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
||||
auto expand_pair = [this, &evaluator, &frame, &create_state, &context](
|
||||
const EdgeAccessor &edge, const VertexAccessor &vertex, const TypedValue &total_weight,
|
||||
int64_t depth) {
|
||||
auto *memory = evaluator.GetMemoryResource();
|
||||
#ifdef MG_ENTERPRISE
|
||||
if (license::global_license_checker.IsEnterpriseValidFast() && context.auth_checker &&
|
||||
!(context.auth_checker->Has(vertex, storage::View::OLD,
|
||||
@ -1731,32 +1786,31 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
frame[self_.weight_lambda_->inner_edge_symbol] = edge;
|
||||
frame[self_.weight_lambda_->inner_node_symbol] = vertex;
|
||||
TypedValue next_weight = CalculateNextWeight(self_.weight_lambda_, total_weight, evaluator);
|
||||
|
||||
if (self_.filter_lambda_.expression) {
|
||||
frame[self_.filter_lambda_.inner_edge_symbol] = edge;
|
||||
frame[self_.filter_lambda_.inner_node_symbol] = vertex;
|
||||
if (self_.filter_lambda_.accumulated_path_symbol) {
|
||||
MG_ASSERT(frame[self_.filter_lambda_.accumulated_path_symbol.value()].IsPath(),
|
||||
"Accumulated path must be path");
|
||||
Path &accumulated_path = frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath();
|
||||
accumulated_path.Expand(edge);
|
||||
accumulated_path.Expand(vertex);
|
||||
|
||||
if (self_.filter_lambda_.accumulated_weight_symbol) {
|
||||
frame[self_.filter_lambda_.accumulated_weight_symbol.value()] = next_weight;
|
||||
}
|
||||
}
|
||||
|
||||
if (!EvaluateFilter(evaluator, self_.filter_lambda_.expression)) return;
|
||||
}
|
||||
|
||||
frame[self_.weight_lambda_->inner_edge_symbol] = edge;
|
||||
frame[self_.weight_lambda_->inner_node_symbol] = vertex;
|
||||
|
||||
TypedValue current_weight = self_.weight_lambda_->expression->Accept(evaluator);
|
||||
|
||||
CheckWeightType(current_weight, memory);
|
||||
|
||||
auto next_state = create_state(vertex, depth);
|
||||
|
||||
TypedValue next_weight = std::invoke([&] {
|
||||
if (total_weight.IsNull()) {
|
||||
return current_weight;
|
||||
}
|
||||
|
||||
ValidateWeightTypes(current_weight, total_weight);
|
||||
|
||||
return TypedValue(current_weight, memory) + total_weight;
|
||||
});
|
||||
|
||||
auto found_it = total_cost_.find(next_state);
|
||||
if (found_it != total_cost_.end() && (found_it->second.IsNull() || (found_it->second <= next_weight).ValueBool()))
|
||||
return;
|
||||
@ -1796,6 +1850,10 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
||||
// Skip expansion for such nodes.
|
||||
if (node.IsNull()) continue;
|
||||
}
|
||||
if (self_.filter_lambda_.accumulated_path_symbol) {
|
||||
// Add initial vertex of path to the accumulated path
|
||||
frame[self_.filter_lambda_.accumulated_path_symbol.value()] = Path(vertex);
|
||||
}
|
||||
if (self_.upper_bound_) {
|
||||
upper_bound_ = EvaluateInt(&evaluator, self_.upper_bound_, "Max depth in weighted shortest path expansion");
|
||||
upper_bound_set_ = true;
|
||||
@ -1808,12 +1866,17 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
||||
"Maximum depth in weighted shortest path expansion must be at "
|
||||
"least 1.");
|
||||
|
||||
frame[self_.weight_lambda_->inner_edge_symbol] = TypedValue();
|
||||
frame[self_.weight_lambda_->inner_node_symbol] = vertex;
|
||||
TypedValue current_weight =
|
||||
CalculateNextWeight(self_.weight_lambda_, /* total_weight */ TypedValue(), evaluator);
|
||||
|
||||
// Clear existing data structures.
|
||||
previous_.clear();
|
||||
total_cost_.clear();
|
||||
yielded_vertices_.clear();
|
||||
|
||||
pq_.emplace(TypedValue(), 0, vertex, std::nullopt);
|
||||
pq_.emplace(current_weight, 0, vertex, std::nullopt);
|
||||
// We are adding the starting vertex to the set of yielded vertices
|
||||
// because we don't want to yield paths that end with the starting
|
||||
// vertex.
|
||||
@ -1913,15 +1976,6 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
||||
// Keeps track of vertices for which we yielded a path already.
|
||||
utils::pmr::unordered_set<VertexAccessor> yielded_vertices_;
|
||||
|
||||
static void ValidateWeightTypes(const TypedValue &lhs, const TypedValue &rhs) {
|
||||
if (!((lhs.IsNumeric() && lhs.IsNumeric()) || (rhs.IsDuration() && rhs.IsDuration()))) {
|
||||
throw QueryRuntimeException(utils::MessageWithLink(
|
||||
"All weights should be of the same type, either numeric or a Duration. Please update the weight "
|
||||
"expression or the filter expression.",
|
||||
"https://memgr.ph/wsp"));
|
||||
}
|
||||
}
|
||||
|
||||
// Priority queue comparator. Keep lowest weight on top of the queue.
|
||||
class PriorityQueueComparator {
|
||||
public:
|
||||
@ -1979,36 +2033,32 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
||||
// queue.
|
||||
auto expand_vertex = [this, &evaluator, &frame](const EdgeAccessor &edge, const EdgeAtom::Direction direction,
|
||||
const TypedValue &total_weight, int64_t depth) {
|
||||
auto *memory = evaluator.GetMemoryResource();
|
||||
|
||||
auto const &next_vertex = direction == EdgeAtom::Direction::IN ? edge.From() : edge.To();
|
||||
|
||||
// Evaluate current weight
|
||||
frame[self_.weight_lambda_->inner_edge_symbol] = edge;
|
||||
frame[self_.weight_lambda_->inner_node_symbol] = next_vertex;
|
||||
TypedValue next_weight = CalculateNextWeight(self_.weight_lambda_, total_weight, evaluator);
|
||||
|
||||
// If filter expression exists, evaluate filter
|
||||
if (self_.filter_lambda_.expression) {
|
||||
frame[self_.filter_lambda_.inner_edge_symbol] = edge;
|
||||
frame[self_.filter_lambda_.inner_node_symbol] = next_vertex;
|
||||
if (self_.filter_lambda_.accumulated_path_symbol) {
|
||||
MG_ASSERT(frame[self_.filter_lambda_.accumulated_path_symbol.value()].IsPath(),
|
||||
"Accumulated path must be path");
|
||||
Path &accumulated_path = frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath();
|
||||
accumulated_path.Expand(edge);
|
||||
accumulated_path.Expand(next_vertex);
|
||||
|
||||
if (self_.filter_lambda_.accumulated_weight_symbol) {
|
||||
frame[self_.filter_lambda_.accumulated_weight_symbol.value()] = next_weight;
|
||||
}
|
||||
}
|
||||
|
||||
if (!EvaluateFilter(evaluator, self_.filter_lambda_.expression)) return;
|
||||
}
|
||||
|
||||
// Evaluate current weight
|
||||
frame[self_.weight_lambda_->inner_edge_symbol] = edge;
|
||||
frame[self_.weight_lambda_->inner_node_symbol] = next_vertex;
|
||||
|
||||
TypedValue current_weight = self_.weight_lambda_->expression->Accept(evaluator);
|
||||
|
||||
CheckWeightType(current_weight, memory);
|
||||
|
||||
TypedValue next_weight = std::invoke([&] {
|
||||
if (total_weight.IsNull()) {
|
||||
return current_weight;
|
||||
}
|
||||
|
||||
ValidateWeightTypes(current_weight, total_weight);
|
||||
|
||||
return TypedValue(current_weight, memory) + total_weight;
|
||||
});
|
||||
|
||||
auto found_it = visited_cost_.find(next_vertex);
|
||||
// Check if the vertex has already been processed.
|
||||
if (found_it != visited_cost_.end()) {
|
||||
@ -2200,7 +2250,17 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
||||
traversal_stack_.clear();
|
||||
total_cost_.clear();
|
||||
|
||||
expand_from_vertex(*start_vertex, TypedValue(), 0);
|
||||
if (self_.filter_lambda_.accumulated_path_symbol) {
|
||||
// Add initial vertex of path to the accumulated path
|
||||
frame[self_.filter_lambda_.accumulated_path_symbol.value()] = Path(*start_vertex);
|
||||
}
|
||||
|
||||
frame[self_.weight_lambda_->inner_edge_symbol] = TypedValue();
|
||||
frame[self_.weight_lambda_->inner_node_symbol] = *start_vertex;
|
||||
TypedValue current_weight =
|
||||
CalculateNextWeight(self_.weight_lambda_, /* total_weight */ TypedValue(), evaluator);
|
||||
|
||||
expand_from_vertex(*start_vertex, current_weight, 0);
|
||||
visited_cost_.emplace(*start_vertex, 0);
|
||||
frame[self_.common_.edge_symbol] = TypedValue::TVector(memory);
|
||||
}
|
||||
@ -2252,15 +2312,6 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
||||
// Stack indicating the traversal level.
|
||||
utils::pmr::list<utils::pmr::list<DirectedEdge>> traversal_stack_;
|
||||
|
||||
static void ValidateWeightTypes(const TypedValue &lhs, const TypedValue &rhs) {
|
||||
if (!((lhs.IsNumeric() && lhs.IsNumeric()) || (rhs.IsDuration() && rhs.IsDuration()))) {
|
||||
throw QueryRuntimeException(utils::MessageWithLink(
|
||||
"All weights should be of the same type, either numeric or a Duration. Please update the weight "
|
||||
"expression or the filter expression.",
|
||||
"https://memgr.ph/wsp"));
|
||||
}
|
||||
}
|
||||
|
||||
// Priority queue comparator. Keep lowest weight on top of the queue.
|
||||
class PriorityQueueComparator {
|
||||
public:
|
||||
|
@ -917,12 +917,18 @@ struct ExpansionLambda {
|
||||
Symbol inner_node_symbol;
|
||||
/// Expression used in lambda during expansion.
|
||||
Expression *expression;
|
||||
/// Currently expanded accumulated path symbol.
|
||||
std::optional<Symbol> accumulated_path_symbol;
|
||||
/// Currently expanded accumulated weight symbol.
|
||||
std::optional<Symbol> accumulated_weight_symbol;
|
||||
|
||||
ExpansionLambda Clone(AstStorage *storage) const {
|
||||
ExpansionLambda object;
|
||||
object.inner_edge_symbol = inner_edge_symbol;
|
||||
object.inner_node_symbol = inner_node_symbol;
|
||||
object.expression = expression ? expression->Clone(storage) : nullptr;
|
||||
object.accumulated_path_symbol = accumulated_path_symbol;
|
||||
object.accumulated_weight_symbol = accumulated_weight_symbol;
|
||||
return object;
|
||||
}
|
||||
};
|
||||
|
@ -74,6 +74,13 @@ std::vector<Expansion> NormalizePatterns(const SymbolTable &symbol_table, const
|
||||
// Remove symbols which are bound by lambda arguments.
|
||||
collector.symbols_.erase(symbol_table.at(*edge->filter_lambda_.inner_edge));
|
||||
collector.symbols_.erase(symbol_table.at(*edge->filter_lambda_.inner_node));
|
||||
if (edge->filter_lambda_.accumulated_path) {
|
||||
collector.symbols_.erase(symbol_table.at(*edge->filter_lambda_.accumulated_path));
|
||||
|
||||
if (edge->filter_lambda_.accumulated_weight) {
|
||||
collector.symbols_.erase(symbol_table.at(*edge->filter_lambda_.accumulated_weight));
|
||||
}
|
||||
}
|
||||
if (edge->type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH ||
|
||||
edge->type_ == EdgeAtom::Type::ALL_SHORTEST_PATHS) {
|
||||
collector.symbols_.erase(symbol_table.at(*edge->weight_lambda_.inner_edge));
|
||||
@ -295,6 +302,13 @@ void Filters::CollectPatternFilters(Pattern &pattern, SymbolTable &symbol_table,
|
||||
prop_pair.second->Accept(collector);
|
||||
collector.symbols_.emplace(symbol_table.at(*atom->filter_lambda_.inner_node));
|
||||
collector.symbols_.emplace(symbol_table.at(*atom->filter_lambda_.inner_edge));
|
||||
if (atom->filter_lambda_.accumulated_path) {
|
||||
collector.symbols_.emplace(symbol_table.at(*atom->filter_lambda_.accumulated_path));
|
||||
|
||||
if (atom->filter_lambda_.accumulated_weight) {
|
||||
collector.symbols_.emplace(symbol_table.at(*atom->filter_lambda_.accumulated_weight));
|
||||
}
|
||||
}
|
||||
// First handle the inline property filter.
|
||||
auto *property_lookup = storage.Create<PropertyLookup>(atom->filter_lambda_.inner_edge, prop_pair.first);
|
||||
auto *prop_equal = storage.Create<EqualOperator>(property_lookup, prop_pair.second);
|
||||
|
@ -171,6 +171,11 @@ class IndexLookupRewriter final : public HierarchicalLogicalOperatorVisitor {
|
||||
if (expand.common_.existing_node) {
|
||||
return true;
|
||||
}
|
||||
if (expand.type_ == EdgeAtom::Type::BREADTH_FIRST && expand.filter_lambda_.accumulated_path_symbol) {
|
||||
// When accumulated path is used, we cannot use ST shortest path algorithm.
|
||||
return false;
|
||||
}
|
||||
|
||||
std::unique_ptr<ScanAll> indexed_scan;
|
||||
ScanAll dst_scan(expand.input(), expand.common_.node_symbol, storage::View::OLD);
|
||||
// With expand to existing we only get real gains with BFS, because we use a
|
||||
|
@ -705,9 +705,9 @@ class RuleBasedPlanner {
|
||||
std::optional<Symbol> total_weight;
|
||||
|
||||
if (edge->type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH || edge->type_ == EdgeAtom::Type::ALL_SHORTEST_PATHS) {
|
||||
weight_lambda.emplace(ExpansionLambda{symbol_table.at(*edge->weight_lambda_.inner_edge),
|
||||
symbol_table.at(*edge->weight_lambda_.inner_node),
|
||||
edge->weight_lambda_.expression});
|
||||
weight_lambda.emplace(ExpansionLambda{.inner_edge_symbol = symbol_table.at(*edge->weight_lambda_.inner_edge),
|
||||
.inner_node_symbol = symbol_table.at(*edge->weight_lambda_.inner_node),
|
||||
.expression = edge->weight_lambda_.expression});
|
||||
|
||||
total_weight.emplace(symbol_table.at(*edge->total_weight_));
|
||||
}
|
||||
@ -715,12 +715,28 @@ class RuleBasedPlanner {
|
||||
ExpansionLambda filter_lambda;
|
||||
filter_lambda.inner_edge_symbol = symbol_table.at(*edge->filter_lambda_.inner_edge);
|
||||
filter_lambda.inner_node_symbol = symbol_table.at(*edge->filter_lambda_.inner_node);
|
||||
if (edge->filter_lambda_.accumulated_path) {
|
||||
filter_lambda.accumulated_path_symbol = symbol_table.at(*edge->filter_lambda_.accumulated_path);
|
||||
|
||||
if (edge->filter_lambda_.accumulated_weight) {
|
||||
filter_lambda.accumulated_weight_symbol = symbol_table.at(*edge->filter_lambda_.accumulated_weight);
|
||||
}
|
||||
}
|
||||
{
|
||||
// Bind the inner edge and node symbols so they're available for
|
||||
// inline filtering in ExpandVariable.
|
||||
bool inner_edge_bound = bound_symbols.insert(filter_lambda.inner_edge_symbol).second;
|
||||
bool inner_node_bound = bound_symbols.insert(filter_lambda.inner_node_symbol).second;
|
||||
MG_ASSERT(inner_edge_bound && inner_node_bound, "An inner edge and node can't be bound from before");
|
||||
if (filter_lambda.accumulated_path_symbol) {
|
||||
bool accumulated_path_bound = bound_symbols.insert(*filter_lambda.accumulated_path_symbol).second;
|
||||
MG_ASSERT(accumulated_path_bound, "The accumulated path can't be bound from before");
|
||||
|
||||
if (filter_lambda.accumulated_weight_symbol) {
|
||||
bool accumulated_weight_bound = bound_symbols.insert(*filter_lambda.accumulated_weight_symbol).second;
|
||||
MG_ASSERT(accumulated_weight_bound, "The accumulated weight can't be bound from before");
|
||||
}
|
||||
}
|
||||
}
|
||||
// Join regular filters with lambda filter expression, so that they
|
||||
// are done inline together. Semantic analysis should guarantee that
|
||||
@ -731,15 +747,34 @@ class RuleBasedPlanner {
|
||||
// filtering (they use the inner symbols. If they were not collected,
|
||||
// we have to remove them manually because no other filter-extraction
|
||||
// will ever bind them again.
|
||||
filters.erase(
|
||||
std::remove_if(filters.begin(), filters.end(),
|
||||
[e = filter_lambda.inner_edge_symbol, n = filter_lambda.inner_node_symbol](FilterInfo &fi) {
|
||||
return utils::Contains(fi.used_symbols, e) || utils::Contains(fi.used_symbols, n);
|
||||
}),
|
||||
filters.end());
|
||||
std::vector<Symbol> inner_symbols = {filter_lambda.inner_edge_symbol, filter_lambda.inner_node_symbol};
|
||||
if (filter_lambda.accumulated_path_symbol) {
|
||||
inner_symbols.emplace_back(*filter_lambda.accumulated_path_symbol);
|
||||
|
||||
if (filter_lambda.accumulated_weight_symbol) {
|
||||
inner_symbols.emplace_back(*filter_lambda.accumulated_weight_symbol);
|
||||
}
|
||||
}
|
||||
|
||||
filters.erase(std::remove_if(filters.begin(), filters.end(),
|
||||
[&inner_symbols](FilterInfo &fi) {
|
||||
for (const auto &symbol : inner_symbols) {
|
||||
if (utils::Contains(fi.used_symbols, symbol)) return true;
|
||||
}
|
||||
return false;
|
||||
}),
|
||||
filters.end());
|
||||
|
||||
// Unbind the temporarily bound inner symbols for filtering.
|
||||
bound_symbols.erase(filter_lambda.inner_edge_symbol);
|
||||
bound_symbols.erase(filter_lambda.inner_node_symbol);
|
||||
if (filter_lambda.accumulated_path_symbol) {
|
||||
bound_symbols.erase(*filter_lambda.accumulated_path_symbol);
|
||||
|
||||
if (filter_lambda.accumulated_weight_symbol) {
|
||||
bound_symbols.erase(*filter_lambda.accumulated_weight_symbol);
|
||||
}
|
||||
}
|
||||
|
||||
if (total_weight) {
|
||||
bound_symbols.insert(*total_weight);
|
||||
|
@ -72,8 +72,9 @@ void AddNextExpansions(const Symbol &node_symbol, const Matching &matching, cons
|
||||
// We are not expanding from node1, so flip the expansion.
|
||||
DMG_ASSERT(expansion.node2 && symbol_table.at(*expansion.node2->identifier_) == node_symbol,
|
||||
"Expected node_symbol to be bound in node2");
|
||||
if (expansion.edge->type_ != EdgeAtom::Type::BREADTH_FIRST) {
|
||||
if (expansion.edge->type_ != EdgeAtom::Type::BREADTH_FIRST && !expansion.edge->filter_lambda_.accumulated_path) {
|
||||
// BFS must *not* be flipped. Doing that changes the BFS results.
|
||||
// When filter lambda uses accumulated path, path must not be flipped.
|
||||
std::swap(expansion.node1, expansion.node2);
|
||||
expansion.is_flipped = true;
|
||||
if (expansion.direction != EdgeAtom::Direction::BOTH) {
|
||||
|
@ -29,4 +29,8 @@ Constraints::Constraints(const Config &config, StorageMode storage_mode) {
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
void Constraints::AbortEntries(std::span<Vertex const *const> vertices, uint64_t exact_start_timestamp) const {
|
||||
static_cast<InMemoryUniqueConstraints *>(unique_constraints_.get())->AbortEntries(vertices, exact_start_timestamp);
|
||||
}
|
||||
} // namespace memgraph::storage
|
||||
|
@ -11,6 +11,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <span>
|
||||
|
||||
#include "storage/v2/config.hpp"
|
||||
#include "storage/v2/constraints/existence_constraints.hpp"
|
||||
#include "storage/v2/constraints/unique_constraints.hpp"
|
||||
@ -27,6 +29,8 @@ struct Constraints {
|
||||
Constraints &operator=(Constraints &&) = delete;
|
||||
~Constraints() = default;
|
||||
|
||||
void AbortEntries(std::span<Vertex const *const> vertices, uint64_t exact_start_timestamp) const;
|
||||
|
||||
std::unique_ptr<ExistenceConstraints> existence_constraints_;
|
||||
std::unique_ptr<UniqueConstraints> unique_constraints_;
|
||||
};
|
||||
|
@ -10,7 +10,9 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "storage/v2/disk//edge_import_mode_cache.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
#include "storage/v2/disk/label_property_index.hpp"
|
||||
#include "storage/v2/indices/indices.hpp"
|
||||
#include "storage/v2/inmemory/label_index.hpp"
|
||||
@ -28,7 +30,7 @@ EdgeImportModeCache::EdgeImportModeCache(const Config &config)
|
||||
InMemoryLabelIndex::Iterable EdgeImportModeCache::Vertices(LabelId label, View view, Storage *storage,
|
||||
Transaction *transaction) const {
|
||||
auto *mem_label_index = static_cast<InMemoryLabelIndex *>(in_memory_indices_.label_index_.get());
|
||||
return mem_label_index->Vertices(label, view, storage, transaction);
|
||||
return mem_label_index->Vertices(label, vertices_.access(), view, storage, transaction);
|
||||
}
|
||||
|
||||
InMemoryLabelPropertyIndex::Iterable EdgeImportModeCache::Vertices(
|
||||
@ -37,7 +39,8 @@ InMemoryLabelPropertyIndex::Iterable EdgeImportModeCache::Vertices(
|
||||
Transaction *transaction) const {
|
||||
auto *mem_label_property_index =
|
||||
static_cast<InMemoryLabelPropertyIndex *>(in_memory_indices_.label_property_index_.get());
|
||||
return mem_label_property_index->Vertices(label, property, lower_bound, upper_bound, view, storage, transaction);
|
||||
return mem_label_property_index->Vertices(label, property, vertices_.access(), lower_bound, upper_bound, view,
|
||||
storage, transaction);
|
||||
}
|
||||
|
||||
bool EdgeImportModeCache::CreateIndex(LabelId label, PropertyId property,
|
||||
|
@ -71,6 +71,37 @@
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
namespace {
|
||||
|
||||
auto FindEdges(const View view, EdgeTypeId edge_type, const VertexAccessor *from_vertex, VertexAccessor *to_vertex)
|
||||
-> Result<EdgesVertexAccessorResult> {
|
||||
auto use_out_edges = [](Vertex const *from_vertex, Vertex const *to_vertex) {
|
||||
// Obtain the locks by `gid` order to avoid lock cycles.
|
||||
auto guard_from = std::unique_lock{from_vertex->lock, std::defer_lock};
|
||||
auto guard_to = std::unique_lock{to_vertex->lock, std::defer_lock};
|
||||
if (from_vertex->gid < to_vertex->gid) {
|
||||
guard_from.lock();
|
||||
guard_to.lock();
|
||||
} else if (from_vertex->gid > to_vertex->gid) {
|
||||
guard_to.lock();
|
||||
guard_from.lock();
|
||||
} else {
|
||||
// The vertices are the same vertex, only lock one.
|
||||
guard_from.lock();
|
||||
}
|
||||
|
||||
// With the potentially cheaper side FindEdges
|
||||
const auto out_n = from_vertex->out_edges.size();
|
||||
const auto in_n = to_vertex->in_edges.size();
|
||||
return out_n <= in_n;
|
||||
};
|
||||
|
||||
return use_out_edges(from_vertex->vertex_, to_vertex->vertex_) ? from_vertex->OutEdges(view, {edge_type}, to_vertex)
|
||||
: to_vertex->InEdges(view, {edge_type}, from_vertex);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
|
||||
|
||||
namespace {
|
||||
@ -952,6 +983,20 @@ Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from,
|
||||
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
|
||||
}
|
||||
|
||||
std::optional<EdgeAccessor> DiskStorage::DiskAccessor::FindEdge(Gid gid, View view, EdgeTypeId edge_type,
|
||||
VertexAccessor *from_vertex,
|
||||
VertexAccessor *to_vertex) {
|
||||
auto res = FindEdges(view, edge_type, from_vertex, to_vertex);
|
||||
if (res.HasError()) return std::nullopt; // TODO: use a Result type
|
||||
|
||||
auto const it = std::ranges::find_if(
|
||||
res->edges, [gid](EdgeAccessor const &edge_accessor) { return edge_accessor.edge_.ptr->gid == gid; });
|
||||
|
||||
if (it == res->edges.end()) return std::nullopt; // TODO: use a Result type
|
||||
|
||||
return *it;
|
||||
}
|
||||
|
||||
Result<EdgeAccessor> DiskStorage::DiskAccessor::EdgeSetFrom(EdgeAccessor * /*edge*/, VertexAccessor * /*new_from*/) {
|
||||
MG_ASSERT(false, "EdgeSetFrom is currently only implemented for InMemory storage");
|
||||
return Error::NONEXISTENT_OBJECT;
|
||||
|
@ -121,6 +121,9 @@ class DiskStorage final : public Storage {
|
||||
|
||||
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override;
|
||||
|
||||
std::optional<EdgeAccessor> FindEdge(Gid gid, View view, EdgeTypeId edge_type, VertexAccessor *from_vertex,
|
||||
VertexAccessor *to_vertex) override;
|
||||
|
||||
Result<EdgeAccessor> EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) override;
|
||||
|
||||
Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override;
|
||||
|
@ -17,6 +17,21 @@
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
void Indices::AbortEntries(LabelId labelId, std::span<Vertex *const> vertices, uint64_t exact_start_timestamp) const {
|
||||
static_cast<InMemoryLabelIndex *>(label_index_.get())->AbortEntries(labelId, vertices, exact_start_timestamp);
|
||||
}
|
||||
|
||||
void Indices::AbortEntries(PropertyId property, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
|
||||
uint64_t exact_start_timestamp) const {
|
||||
static_cast<InMemoryLabelPropertyIndex *>(label_property_index_.get())
|
||||
->AbortEntries(property, vertices, exact_start_timestamp);
|
||||
}
|
||||
void Indices::AbortEntries(LabelId label, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
|
||||
uint64_t exact_start_timestamp) const {
|
||||
static_cast<InMemoryLabelPropertyIndex *>(label_property_index_.get())
|
||||
->AbortEntries(label, vertices, exact_start_timestamp);
|
||||
}
|
||||
|
||||
void Indices::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) const {
|
||||
static_cast<InMemoryLabelIndex *>(label_index_.get())->RemoveObsoleteEntries(oldest_active_start_timestamp);
|
||||
static_cast<InMemoryLabelPropertyIndex *>(label_property_index_.get())
|
||||
@ -50,4 +65,8 @@ Indices::Indices(const Config &config, StorageMode storage_mode) {
|
||||
});
|
||||
}
|
||||
|
||||
Indices::IndexStats Indices::Analysis() const {
|
||||
return {static_cast<InMemoryLabelIndex *>(label_index_.get())->Analysis(),
|
||||
static_cast<InMemoryLabelPropertyIndex *>(label_property_index_.get())->Analysis()};
|
||||
}
|
||||
} // namespace memgraph::storage
|
||||
|
@ -12,6 +12,9 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <span>
|
||||
|
||||
#include "storage/v2/id_types.hpp"
|
||||
#include "storage/v2/indices/label_index.hpp"
|
||||
#include "storage/v2/indices/label_property_index.hpp"
|
||||
#include "storage/v2/storage_mode.hpp"
|
||||
@ -32,6 +35,20 @@ struct Indices {
|
||||
/// TODO: unused in disk indices
|
||||
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) const;
|
||||
|
||||
/// Surgical removal of entries that was inserted this transaction
|
||||
/// TODO: unused in disk indices
|
||||
void AbortEntries(LabelId labelId, std::span<Vertex *const> vertices, uint64_t exact_start_timestamp) const;
|
||||
void AbortEntries(PropertyId property, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
|
||||
uint64_t exact_start_timestamp) const;
|
||||
void AbortEntries(LabelId label, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
|
||||
uint64_t exact_start_timestamp) const;
|
||||
|
||||
struct IndexStats {
|
||||
std::vector<LabelId> label;
|
||||
LabelPropertyIndex::IndexStats property_label;
|
||||
};
|
||||
IndexStats Analysis() const;
|
||||
|
||||
// Indices are updated whenever an update occurs, instead of only on commit or
|
||||
// advance command. This is necessary because we want indices to support `NEW`
|
||||
// view for use in Merge.
|
||||
|
@ -19,6 +19,11 @@ namespace memgraph::storage {
|
||||
|
||||
class LabelPropertyIndex {
|
||||
public:
|
||||
struct IndexStats {
|
||||
std::map<LabelId, std::vector<PropertyId>> l2p;
|
||||
std::map<PropertyId, std::vector<LabelId>> p2l;
|
||||
};
|
||||
|
||||
LabelPropertyIndex() = default;
|
||||
LabelPropertyIndex(const LabelPropertyIndex &) = delete;
|
||||
LabelPropertyIndex(LabelPropertyIndex &&) = delete;
|
||||
|
@ -10,8 +10,12 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "storage/v2/inmemory/label_index.hpp"
|
||||
|
||||
#include <span>
|
||||
|
||||
#include "storage/v2/constraints/constraints.hpp"
|
||||
#include "storage/v2/indices/indices_utils.hpp"
|
||||
#include "storage/v2/inmemory/storage.hpp"
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
@ -96,9 +100,23 @@ void InMemoryLabelIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_time
|
||||
}
|
||||
}
|
||||
|
||||
InMemoryLabelIndex::Iterable::Iterable(utils::SkipList<Entry>::Accessor index_accessor, LabelId label, View view,
|
||||
Storage *storage, Transaction *transaction)
|
||||
: index_accessor_(std::move(index_accessor)),
|
||||
void InMemoryLabelIndex::AbortEntries(LabelId labelId, std::span<Vertex *const> vertices,
|
||||
uint64_t exact_start_timestamp) {
|
||||
auto const it = index_.find(labelId);
|
||||
if (it == index_.end()) return;
|
||||
|
||||
auto &label_storage = it->second;
|
||||
auto vertices_acc = label_storage.access();
|
||||
for (auto *vertex : vertices) {
|
||||
vertices_acc.remove(Entry{vertex, exact_start_timestamp});
|
||||
}
|
||||
}
|
||||
|
||||
InMemoryLabelIndex::Iterable::Iterable(utils::SkipList<Entry>::Accessor index_accessor,
|
||||
utils::SkipList<Vertex>::ConstAccessor vertices_accessor, LabelId label,
|
||||
View view, Storage *storage, Transaction *transaction)
|
||||
: pin_accessor_(std::move(vertices_accessor)),
|
||||
index_accessor_(std::move(index_accessor)),
|
||||
label_(label),
|
||||
view_(view),
|
||||
storage_(storage),
|
||||
@ -147,9 +165,21 @@ void InMemoryLabelIndex::RunGC() {
|
||||
|
||||
InMemoryLabelIndex::Iterable InMemoryLabelIndex::Vertices(LabelId label, View view, Storage *storage,
|
||||
Transaction *transaction) {
|
||||
DMG_ASSERT(storage->storage_mode_ == StorageMode::IN_MEMORY_TRANSACTIONAL ||
|
||||
storage->storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL,
|
||||
"LabelIndex trying to access InMemory vertices from OnDisk!");
|
||||
auto vertices_acc = static_cast<InMemoryStorage const *>(storage)->vertices_.access();
|
||||
const auto it = index_.find(label);
|
||||
MG_ASSERT(it != index_.end(), "Index for label {} doesn't exist", label.AsUint());
|
||||
return {it->second.access(), label, view, storage, transaction};
|
||||
return {it->second.access(), std::move(vertices_acc), label, view, storage, transaction};
|
||||
}
|
||||
|
||||
InMemoryLabelIndex::Iterable InMemoryLabelIndex::Vertices(
|
||||
LabelId label, memgraph::utils::SkipList<memgraph::storage::Vertex>::ConstAccessor vertices_acc, View view,
|
||||
Storage *storage, Transaction *transaction) {
|
||||
const auto it = index_.find(label);
|
||||
MG_ASSERT(it != index_.end(), "Index for label {} doesn't exist", label.AsUint());
|
||||
return {it->second.access(), std::move(vertices_acc), label, view, storage, transaction};
|
||||
}
|
||||
|
||||
void InMemoryLabelIndex::SetIndexStats(const storage::LabelId &label, const storage::LabelIndexStats &stats) {
|
||||
@ -187,4 +217,12 @@ bool InMemoryLabelIndex::DeleteIndexStats(const storage::LabelId &label) {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::vector<LabelId> InMemoryLabelIndex::Analysis() const {
|
||||
std::vector<LabelId> res;
|
||||
res.reserve(index_.size());
|
||||
for (const auto &[label, _] : index_) {
|
||||
res.emplace_back(label);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
} // namespace memgraph::storage
|
||||
|
@ -11,6 +11,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <span>
|
||||
|
||||
#include "storage/v2/constraints/constraints.hpp"
|
||||
#include "storage/v2/indices/label_index.hpp"
|
||||
#include "storage/v2/indices/label_index_stats.hpp"
|
||||
@ -56,10 +58,15 @@ class InMemoryLabelIndex : public storage::LabelIndex {
|
||||
|
||||
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp);
|
||||
|
||||
/// Surgical removal of entries that was inserted this transaction
|
||||
void AbortEntries(LabelId labelId, std::span<Vertex *const> vertices, uint64_t exact_start_timestamp);
|
||||
|
||||
std::vector<LabelId> Analysis() const;
|
||||
|
||||
class Iterable {
|
||||
public:
|
||||
Iterable(utils::SkipList<Entry>::Accessor index_accessor, LabelId label, View view, Storage *storage,
|
||||
Transaction *transaction);
|
||||
Iterable(utils::SkipList<Entry>::Accessor index_accessor, utils::SkipList<Vertex>::ConstAccessor vertices_accessor,
|
||||
LabelId label, View view, Storage *storage, Transaction *transaction);
|
||||
|
||||
class Iterator {
|
||||
public:
|
||||
@ -85,6 +92,7 @@ class InMemoryLabelIndex : public storage::LabelIndex {
|
||||
Iterator end() { return {this, index_accessor_.end()}; }
|
||||
|
||||
private:
|
||||
utils::SkipList<Vertex>::ConstAccessor pin_accessor_;
|
||||
utils::SkipList<Entry>::Accessor index_accessor_;
|
||||
LabelId label_;
|
||||
View view_;
|
||||
@ -98,6 +106,9 @@ class InMemoryLabelIndex : public storage::LabelIndex {
|
||||
|
||||
Iterable Vertices(LabelId label, View view, Storage *storage, Transaction *transaction);
|
||||
|
||||
Iterable Vertices(LabelId label, memgraph::utils::SkipList<memgraph::storage::Vertex>::ConstAccessor vertices_acc,
|
||||
View view, Storage *storage, Transaction *transaction);
|
||||
|
||||
void SetIndexStats(const storage::LabelId &label, const storage::LabelIndexStats &stats);
|
||||
|
||||
std::optional<storage::LabelIndexStats> GetIndexStats(const storage::LabelId &label) const;
|
||||
|
@ -12,6 +12,8 @@
|
||||
#include "storage/v2/inmemory/label_property_index.hpp"
|
||||
#include "storage/v2/constraints/constraints.hpp"
|
||||
#include "storage/v2/indices/indices_utils.hpp"
|
||||
#include "storage/v2/inmemory/storage.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
@ -101,11 +103,12 @@ void InMemoryLabelPropertyIndex::UpdateOnSetProperty(PropertyId property, const
|
||||
return;
|
||||
}
|
||||
|
||||
if (!indices_by_property_.contains(property)) {
|
||||
auto index = indices_by_property_.find(property);
|
||||
if (index == indices_by_property_.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (const auto &[_, storage] : indices_by_property_.at(property)) {
|
||||
for (const auto &[_, storage] : index->second) {
|
||||
auto acc = storage->access();
|
||||
acc.insert(Entry{value, vertex, tx.start_timestamp});
|
||||
}
|
||||
@ -220,12 +223,14 @@ const PropertyValue kSmallestMap = PropertyValue(std::map<std::string, PropertyV
|
||||
const PropertyValue kSmallestTemporalData =
|
||||
PropertyValue(TemporalData{static_cast<TemporalType>(0), std::numeric_limits<int64_t>::min()});
|
||||
|
||||
InMemoryLabelPropertyIndex::Iterable::Iterable(utils::SkipList<Entry>::Accessor index_accessor, LabelId label,
|
||||
InMemoryLabelPropertyIndex::Iterable::Iterable(utils::SkipList<Entry>::Accessor index_accessor,
|
||||
utils::SkipList<Vertex>::ConstAccessor vertices_accessor, LabelId label,
|
||||
PropertyId property,
|
||||
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view,
|
||||
Storage *storage, Transaction *transaction)
|
||||
: index_accessor_(std::move(index_accessor)),
|
||||
: pin_accessor_(std::move(vertices_accessor)),
|
||||
index_accessor_(std::move(index_accessor)),
|
||||
label_(label),
|
||||
property_(property),
|
||||
lower_bound_(lower_bound),
|
||||
@ -428,9 +433,57 @@ InMemoryLabelPropertyIndex::Iterable InMemoryLabelPropertyIndex::Vertices(
|
||||
LabelId label, PropertyId property, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Storage *storage,
|
||||
Transaction *transaction) {
|
||||
DMG_ASSERT(storage->storage_mode_ == StorageMode::IN_MEMORY_TRANSACTIONAL ||
|
||||
storage->storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL,
|
||||
"PropertyLabel index trying to access InMemory vertices from OnDisk!");
|
||||
auto vertices_acc = static_cast<InMemoryStorage const *>(storage)->vertices_.access();
|
||||
auto it = index_.find({label, property});
|
||||
MG_ASSERT(it != index_.end(), "Index for label {} and property {} doesn't exist", label.AsUint(), property.AsUint());
|
||||
return {it->second.access(), label, property, lower_bound, upper_bound, view, storage, transaction};
|
||||
return {it->second.access(), std::move(vertices_acc), label, property, lower_bound, upper_bound, view, storage,
|
||||
transaction};
|
||||
}
|
||||
|
||||
InMemoryLabelPropertyIndex::Iterable InMemoryLabelPropertyIndex::Vertices(
|
||||
LabelId label, PropertyId property,
|
||||
memgraph::utils::SkipList<memgraph::storage::Vertex>::ConstAccessor vertices_acc,
|
||||
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Storage *storage,
|
||||
Transaction *transaction) {
|
||||
auto it = index_.find({label, property});
|
||||
MG_ASSERT(it != index_.end(), "Index for label {} and property {} doesn't exist", label.AsUint(), property.AsUint());
|
||||
return {it->second.access(), std::move(vertices_acc), label, property, lower_bound, upper_bound, view, storage,
|
||||
transaction};
|
||||
}
|
||||
|
||||
void InMemoryLabelPropertyIndex::AbortEntries(PropertyId property,
|
||||
std::span<std::pair<PropertyValue, Vertex *> const> vertices,
|
||||
uint64_t exact_start_timestamp) {
|
||||
auto const it = indices_by_property_.find(property);
|
||||
if (it == indices_by_property_.end()) return;
|
||||
|
||||
auto &indices = it->second;
|
||||
for (const auto &[_, index] : indices) {
|
||||
auto index_acc = index->access();
|
||||
for (auto const &[value, vertex] : vertices) {
|
||||
index_acc.remove(Entry{value, vertex, exact_start_timestamp});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void InMemoryLabelPropertyIndex::AbortEntries(LabelId label,
|
||||
std::span<std::pair<PropertyValue, Vertex *> const> vertices,
|
||||
uint64_t exact_start_timestamp) {
|
||||
for (auto &[label_prop, storage] : index_) {
|
||||
if (label_prop.first != label) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto index_acc = storage.access();
|
||||
for (const auto &[property, vertex] : vertices) {
|
||||
if (!property.IsNull()) {
|
||||
index_acc.remove(Entry{property, vertex, exact_start_timestamp});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace memgraph::storage
|
||||
|
@ -11,9 +11,13 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <span>
|
||||
|
||||
#include "storage/v2/constraints/constraints.hpp"
|
||||
#include "storage/v2/id_types.hpp"
|
||||
#include "storage/v2/indices/label_property_index.hpp"
|
||||
#include "storage/v2/indices/label_property_index_stats.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "utils/rw_lock.hpp"
|
||||
#include "utils/synchronized.hpp"
|
||||
|
||||
@ -61,10 +65,25 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
|
||||
|
||||
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp);
|
||||
|
||||
void AbortEntries(PropertyId property, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
|
||||
uint64_t exact_start_timestamp);
|
||||
void AbortEntries(LabelId label, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
|
||||
uint64_t exact_start_timestamp);
|
||||
|
||||
IndexStats Analysis() const {
|
||||
IndexStats res{};
|
||||
for (const auto &[lp, _] : index_) {
|
||||
const auto &[label, property] = lp;
|
||||
res.l2p[label].emplace_back(property);
|
||||
res.p2l[property].emplace_back(label);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
class Iterable {
|
||||
public:
|
||||
Iterable(utils::SkipList<Entry>::Accessor index_accessor, LabelId label, PropertyId property,
|
||||
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||
Iterable(utils::SkipList<Entry>::Accessor index_accessor, utils::SkipList<Vertex>::ConstAccessor vertices_accessor,
|
||||
LabelId label, PropertyId property, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Storage *storage,
|
||||
Transaction *transaction);
|
||||
|
||||
@ -92,6 +111,7 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
|
||||
Iterator end();
|
||||
|
||||
private:
|
||||
utils::SkipList<Vertex>::ConstAccessor pin_accessor_;
|
||||
utils::SkipList<Entry>::Accessor index_accessor_;
|
||||
LabelId label_;
|
||||
PropertyId property_;
|
||||
@ -131,6 +151,12 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Storage *storage,
|
||||
Transaction *transaction);
|
||||
|
||||
Iterable Vertices(LabelId label, PropertyId property,
|
||||
memgraph::utils::SkipList<memgraph::storage::Vertex>::ConstAccessor vertices_acc,
|
||||
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Storage *storage,
|
||||
Transaction *transaction);
|
||||
|
||||
private:
|
||||
std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>> index_;
|
||||
std::unordered_map<PropertyId, std::unordered_map<LabelId, utils::SkipList<Entry> *>> indices_by_property_;
|
||||
|
@ -10,21 +10,57 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "storage/v2/inmemory/storage.hpp"
|
||||
#include <algorithm>
|
||||
#include <functional>
|
||||
#include "dbms/constants.hpp"
|
||||
#include "memory/global_memory_control.hpp"
|
||||
#include "storage/v2/durability/durability.hpp"
|
||||
#include "storage/v2/durability/snapshot.hpp"
|
||||
#include "storage/v2/edge_direction.hpp"
|
||||
#include "storage/v2/id_types.hpp"
|
||||
#include "storage/v2/metadata_delta.hpp"
|
||||
|
||||
/// REPLICATION ///
|
||||
#include "dbms/inmemory/replication_handlers.hpp"
|
||||
#include "storage/v2/inmemory/replication/recovery.hpp"
|
||||
#include "storage/v2/inmemory/unique_constraints.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "utils/resource_lock.hpp"
|
||||
#include "utils/stat.hpp"
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
namespace {
|
||||
|
||||
auto FindEdges(const View view, EdgeTypeId edge_type, const VertexAccessor *from_vertex, VertexAccessor *to_vertex)
|
||||
-> Result<EdgesVertexAccessorResult> {
|
||||
auto use_out_edges = [](Vertex const *from_vertex, Vertex const *to_vertex) {
|
||||
// Obtain the locks by `gid` order to avoid lock cycles.
|
||||
auto guard_from = std::unique_lock{from_vertex->lock, std::defer_lock};
|
||||
auto guard_to = std::unique_lock{to_vertex->lock, std::defer_lock};
|
||||
if (from_vertex->gid < to_vertex->gid) {
|
||||
guard_from.lock();
|
||||
guard_to.lock();
|
||||
} else if (from_vertex->gid > to_vertex->gid) {
|
||||
guard_to.lock();
|
||||
guard_from.lock();
|
||||
} else {
|
||||
// The vertices are the same vertex, only lock one.
|
||||
guard_from.lock();
|
||||
}
|
||||
|
||||
// With the potentially cheaper side FindEdges
|
||||
const auto out_n = from_vertex->out_edges.size();
|
||||
const auto in_n = to_vertex->in_edges.size();
|
||||
return out_n <= in_n;
|
||||
};
|
||||
|
||||
return use_out_edges(from_vertex->vertex_, to_vertex->vertex_) ? from_vertex->OutEdges(view, {edge_type}, to_vertex)
|
||||
: to_vertex->InEdges(view, {edge_type}, from_vertex);
|
||||
}
|
||||
|
||||
}; // namespace
|
||||
|
||||
using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
|
||||
|
||||
InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
|
||||
@ -318,6 +354,24 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
|
||||
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
|
||||
}
|
||||
|
||||
std::optional<EdgeAccessor> InMemoryStorage::InMemoryAccessor::FindEdge(Gid gid, const View view, EdgeTypeId edge_type,
|
||||
VertexAccessor *from_vertex,
|
||||
VertexAccessor *to_vertex) {
|
||||
auto res = FindEdges(view, edge_type, from_vertex, to_vertex);
|
||||
if (res.HasError()) return std::nullopt; // TODO: use a Result type
|
||||
|
||||
auto const it = std::invoke([this, gid, &res]() {
|
||||
auto const byGid = [gid](EdgeAccessor const &edge_accessor) { return edge_accessor.edge_.gid == gid; };
|
||||
auto const byEdgePtr = [gid](EdgeAccessor const &edge_accessor) { return edge_accessor.edge_.ptr->gid == gid; };
|
||||
if (config_.properties_on_edges) return std::ranges::find_if(res->edges, byEdgePtr);
|
||||
return std::ranges::find_if(res->edges, byGid);
|
||||
});
|
||||
|
||||
if (it == res->edges.end()) return std::nullopt; // TODO: use a Result type
|
||||
|
||||
return *it;
|
||||
}
|
||||
|
||||
Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAccessor *from, VertexAccessor *to,
|
||||
EdgeTypeId edge_type, storage::Gid gid) {
|
||||
MG_ASSERT(from->transaction_ == to->transaction_,
|
||||
@ -704,7 +758,8 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
|
||||
could_replicate_all_sync_replicas =
|
||||
mem_storage->AppendToWalDataDefinition(transaction_, *commit_timestamp_); // protected by engine_guard
|
||||
// TODO: release lock, and update all deltas to have a local copy of the commit timestamp
|
||||
transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); // protected by engine_guard
|
||||
transaction_.commit_timestamp->store(*commit_timestamp_,
|
||||
std::memory_order_release); // protected by engine_guard
|
||||
// Replica can only update the last commit timestamp with
|
||||
// the commits received from main.
|
||||
if (is_main || desired_commit_timestamp.has_value()) {
|
||||
@ -830,6 +885,21 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
std::list<Gid> my_deleted_vertices;
|
||||
std::list<Gid> my_deleted_edges;
|
||||
|
||||
std::map<LabelId, std::vector<Vertex *>> label_cleanup;
|
||||
std::map<LabelId, std::vector<std::pair<PropertyValue, Vertex *>>> label_property_cleanup;
|
||||
std::map<PropertyId, std::vector<std::pair<PropertyValue, Vertex *>>> property_cleanup;
|
||||
|
||||
// CONSTRAINTS
|
||||
if (transaction_.constraint_verification_info.NeedsUniqueConstraintVerification()) {
|
||||
// Need to remove elements from constraints before handling of the deltas, so the elements match the correct
|
||||
// values
|
||||
auto vertices_to_check = transaction_.constraint_verification_info.GetVerticesForUniqueConstraintChecking();
|
||||
auto vertices_to_check_v = std::vector<Vertex const *>{vertices_to_check.begin(), vertices_to_check.end()};
|
||||
storage_->constraints_.AbortEntries(vertices_to_check_v, transaction_.start_timestamp);
|
||||
}
|
||||
|
||||
const auto index_stats = storage_->indices_.Analysis();
|
||||
|
||||
for (const auto &delta : transaction_.deltas.use()) {
|
||||
auto prev = delta.prev.Get();
|
||||
switch (prev.type) {
|
||||
@ -845,6 +915,24 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
MG_ASSERT(it != vertex->labels.end(), "Invalid database state!");
|
||||
std::swap(*it, *vertex->labels.rbegin());
|
||||
vertex->labels.pop_back();
|
||||
|
||||
// For label index
|
||||
// check if there is a label index for the label and add entry if so
|
||||
// For property label index
|
||||
// check if we care about the label; this will return all the propertyIds we care about and then get
|
||||
// the current property value
|
||||
if (std::binary_search(index_stats.label.begin(), index_stats.label.end(), current->label)) {
|
||||
label_cleanup[current->label].emplace_back(vertex);
|
||||
}
|
||||
const auto &properties = index_stats.property_label.l2p.find(current->label);
|
||||
if (properties != index_stats.property_label.l2p.end()) {
|
||||
for (const auto &property : properties->second) {
|
||||
auto current_value = vertex->properties.GetProperty(property);
|
||||
if (!current_value.IsNull()) {
|
||||
label_property_cleanup[current->label].emplace_back(std::move(current_value), vertex);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_LABEL: {
|
||||
@ -854,6 +942,18 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
break;
|
||||
}
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
// For label index nothing
|
||||
// For property label index
|
||||
// check if we care about the property, this will return all the labels and then get current property
|
||||
// value
|
||||
const auto &labels = index_stats.property_label.p2l.find(current->property.key);
|
||||
if (labels != index_stats.property_label.p2l.end()) {
|
||||
auto current_value = vertex->properties.GetProperty(current->property.key);
|
||||
if (!current_value.IsNull()) {
|
||||
property_cleanup[current->property.key].emplace_back(std::move(current_value), vertex);
|
||||
}
|
||||
}
|
||||
// Setting the correct value
|
||||
vertex->properties.SetProperty(current->property.key, current->property.value);
|
||||
break;
|
||||
}
|
||||
@ -970,7 +1070,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
|
||||
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
|
||||
{
|
||||
std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_);
|
||||
auto engine_guard = std::unique_lock(storage_->engine_lock_);
|
||||
uint64_t mark_timestamp = storage_->timestamp_;
|
||||
// Take garbage_undo_buffers lock while holding the engine lock to make
|
||||
// sure that entries are sorted by mark timestamp in the list.
|
||||
@ -982,10 +1082,37 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas),
|
||||
std::move(transaction_.commit_timestamp));
|
||||
});
|
||||
mem_storage->deleted_vertices_.WithLock(
|
||||
[&](auto &deleted_vertices) { deleted_vertices.splice(deleted_vertices.begin(), my_deleted_vertices); });
|
||||
mem_storage->deleted_edges_.WithLock(
|
||||
[&](auto &deleted_edges) { deleted_edges.splice(deleted_edges.begin(), my_deleted_edges); });
|
||||
|
||||
/// We MUST unlink (aka. remove) entries in indexes and constraints
|
||||
/// before we unlink (aka. remove) vertices from storage
|
||||
/// this is because they point into vertices skip_list
|
||||
|
||||
// INDICES
|
||||
for (auto const &[label, vertices] : label_cleanup) {
|
||||
storage_->indices_.AbortEntries(label, vertices, transaction_.start_timestamp);
|
||||
}
|
||||
for (auto const &[label, prop_vertices] : label_property_cleanup) {
|
||||
storage_->indices_.AbortEntries(label, prop_vertices, transaction_.start_timestamp);
|
||||
}
|
||||
for (auto const &[property, prop_vertices] : property_cleanup) {
|
||||
storage_->indices_.AbortEntries(property, prop_vertices, transaction_.start_timestamp);
|
||||
}
|
||||
|
||||
// VERTICES
|
||||
{
|
||||
auto vertices_acc = mem_storage->vertices_.access();
|
||||
for (auto gid : my_deleted_vertices) {
|
||||
vertices_acc.remove(gid);
|
||||
}
|
||||
}
|
||||
|
||||
// EDGES
|
||||
{
|
||||
auto edges_acc = mem_storage->edges_.access();
|
||||
for (auto gid : my_deleted_edges) {
|
||||
edges_acc.remove(gid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mem_storage->commit_log_->MarkFinished(transaction_.start_timestamp);
|
||||
@ -1278,8 +1405,6 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
|
||||
// vertices that appear in an index also exist in main storage.
|
||||
std::list<Gid> current_deleted_edges;
|
||||
std::list<Gid> current_deleted_vertices;
|
||||
deleted_vertices_->swap(current_deleted_vertices);
|
||||
deleted_edges_->swap(current_deleted_edges);
|
||||
|
||||
auto const need_full_scan_vertices = gc_full_scan_vertices_delete_.exchange(false);
|
||||
auto const need_full_scan_edges = gc_full_scan_edges_delete_.exchange(false);
|
||||
@ -1929,12 +2054,12 @@ utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::Create
|
||||
void InMemoryStorage::FreeMemory(std::unique_lock<utils::ResourceLock> main_guard) {
|
||||
CollectGarbage<true>(std::move(main_guard));
|
||||
|
||||
static_cast<InMemoryLabelIndex *>(indices_.label_index_.get())->RunGC();
|
||||
static_cast<InMemoryLabelPropertyIndex *>(indices_.label_property_index_.get())->RunGC();
|
||||
|
||||
// SkipList is already threadsafe
|
||||
vertices_.run_gc();
|
||||
edges_.run_gc();
|
||||
|
||||
static_cast<InMemoryLabelIndex *>(indices_.label_index_.get())->RunGC();
|
||||
static_cast<InMemoryLabelPropertyIndex *>(indices_.label_property_index_.get())->RunGC();
|
||||
}
|
||||
|
||||
uint64_t InMemoryStorage::CommitTimestamp(const std::optional<uint64_t> desired_commit_timestamp) {
|
||||
|
@ -51,6 +51,8 @@ class InMemoryStorage final : public Storage {
|
||||
friend std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit,
|
||||
utils::FileRetainer::FileLocker *file_locker,
|
||||
const InMemoryStorage *storage);
|
||||
friend class InMemoryLabelIndex;
|
||||
friend class InMemoryLabelPropertyIndex;
|
||||
|
||||
public:
|
||||
enum class CreateSnapshotError : uint8_t { DisabledForReplica, ReachedMaxNumTries };
|
||||
@ -185,6 +187,9 @@ class InMemoryStorage final : public Storage {
|
||||
/// @throw std::bad_alloc
|
||||
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override;
|
||||
|
||||
std::optional<EdgeAccessor> FindEdge(Gid gid, View view, EdgeTypeId edge_type, VertexAccessor *from_vertex,
|
||||
VertexAccessor *to_vertex) override;
|
||||
|
||||
Result<EdgeAccessor> EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) override;
|
||||
|
||||
Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override;
|
||||
|
@ -256,11 +256,12 @@ bool InMemoryUniqueConstraints::Entry::operator==(const std::vector<PropertyValu
|
||||
|
||||
void InMemoryUniqueConstraints::UpdateBeforeCommit(const Vertex *vertex, const Transaction &tx) {
|
||||
for (const auto &label : vertex->labels) {
|
||||
if (!constraints_by_label_.contains(label)) {
|
||||
const auto &constraint = constraints_by_label_.find(label);
|
||||
if (constraint == constraints_by_label_.end()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (auto &[props, storage] : constraints_by_label_.at(label)) {
|
||||
for (auto &[props, storage] : constraint->second) {
|
||||
auto values = vertex->properties.ExtractPropertyValues(props);
|
||||
|
||||
if (!values) {
|
||||
@ -273,6 +274,28 @@ void InMemoryUniqueConstraints::UpdateBeforeCommit(const Vertex *vertex, const T
|
||||
}
|
||||
}
|
||||
|
||||
void InMemoryUniqueConstraints::AbortEntries(std::span<Vertex const *const> vertices, uint64_t exact_start_timestamp) {
|
||||
for (const auto &vertex : vertices) {
|
||||
for (const auto &label : vertex->labels) {
|
||||
const auto &constraint = constraints_by_label_.find(label);
|
||||
if (constraint == constraints_by_label_.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto &[props, storage] : constraint->second) {
|
||||
auto values = vertex->properties.ExtractPropertyValues(props);
|
||||
|
||||
if (!values) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto acc = storage->access();
|
||||
acc.remove(Entry{std::move(*values), vertex, exact_start_timestamp});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
utils::BasicResult<ConstraintViolation, InMemoryUniqueConstraints::CreationStatus>
|
||||
InMemoryUniqueConstraints::CreateConstraint(LabelId label, const std::set<PropertyId> &properties,
|
||||
utils::SkipList<Vertex>::Accessor vertices) {
|
||||
@ -364,12 +387,14 @@ std::optional<ConstraintViolation> InMemoryUniqueConstraints::Validate(const Ver
|
||||
if (vertex.deleted) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
for (const auto &label : vertex.labels) {
|
||||
if (!constraints_by_label_.contains(label)) {
|
||||
const auto &constraint = constraints_by_label_.find(label);
|
||||
if (constraint == constraints_by_label_.end()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (const auto &[properties, storage] : constraints_by_label_.at(label)) {
|
||||
for (const auto &[properties, storage] : constraint->second) {
|
||||
auto value_array = vertex.properties.ExtractPropertyValues(properties);
|
||||
|
||||
if (!value_array) {
|
||||
|
@ -11,6 +11,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <span>
|
||||
|
||||
#include "storage/v2/constraints/unique_constraints.hpp"
|
||||
|
||||
namespace memgraph::storage {
|
||||
@ -54,6 +56,8 @@ class InMemoryUniqueConstraints : public UniqueConstraints {
|
||||
void UpdateBeforeCommit(const Vertex *vertex, std::unordered_set<LabelId> &added_labels,
|
||||
std::unordered_set<PropertyId> &added_properties, const Transaction &tx);
|
||||
|
||||
void AbortEntries(std::span<Vertex const *const> vertices, uint64_t exact_start_timestamp);
|
||||
|
||||
/// Creates unique constraint on the given `label` and a list of `properties`.
|
||||
/// Returns constraint violation if there are multiple vertices with the same
|
||||
/// label and property values. Returns `CreationStatus::ALREADY_EXISTS` if
|
||||
|
@ -196,6 +196,9 @@ class Storage {
|
||||
|
||||
virtual Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) = 0;
|
||||
|
||||
virtual std::optional<EdgeAccessor> FindEdge(Gid gid, View view, EdgeTypeId edge_type, VertexAccessor *from_vertex,
|
||||
VertexAccessor *to_vertex) = 0;
|
||||
|
||||
virtual Result<EdgeAccessor> EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) = 0;
|
||||
|
||||
virtual Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) = 0;
|
||||
|
@ -16,11 +16,11 @@
|
||||
#include <list>
|
||||
#include <thread>
|
||||
|
||||
constexpr char *kProcedureHackerNews = "hacker_news";
|
||||
constexpr char *kArgumentHackerNewsVotes = "votes";
|
||||
constexpr char *kArgumentHackerNewsItemHourAge = "item_hour_age";
|
||||
constexpr char *kArgumentHackerNewsGravity = "gravity";
|
||||
constexpr char *kReturnHackerNewsScore = "score";
|
||||
constexpr char const *kProcedureHackerNews = "hacker_news";
|
||||
constexpr char const *kArgumentHackerNewsVotes = "votes";
|
||||
constexpr char const *kArgumentHackerNewsItemHourAge = "item_hour_age";
|
||||
constexpr char const *kArgumentHackerNewsGravity = "gravity";
|
||||
constexpr char const *kReturnHackerNewsScore = "score";
|
||||
|
||||
void HackerNews(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
|
||||
mgp::MemoryDispatcherGuard guard(memory);
|
||||
|
@ -13,6 +13,7 @@ copy_e2e_python_files(replication_show common.py)
|
||||
copy_e2e_python_files(replication_show conftest.py)
|
||||
copy_e2e_python_files(replication_show show.py)
|
||||
copy_e2e_python_files(replication_show show_while_creating_invalid_state.py)
|
||||
copy_e2e_python_files(replication_show edge_delete.py)
|
||||
copy_e2e_python_files_from_parent_folder(replication_show ".." memgraph.py)
|
||||
copy_e2e_python_files_from_parent_folder(replication_show ".." interactive_mg_runner.py)
|
||||
copy_e2e_python_files_from_parent_folder(replication_show ".." mg_utils.py)
|
||||
|
56
tests/e2e/replication/edge_delete.py
Executable file
56
tests/e2e/replication/edge_delete.py
Executable file
@ -0,0 +1,56 @@
|
||||
# Copyright 2022 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from common import execute_and_fetch_all
|
||||
from mg_utils import mg_sleep_and_assert
|
||||
|
||||
|
||||
# BUGFIX: for issue https://github.com/memgraph/memgraph/issues/1515
|
||||
def test_replication_handles_delete_when_multiple_edges_of_same_type(connection):
|
||||
# Goal is to check the timestamp are correctly computed from the information we get from replicas.
|
||||
# 0/ Check original state of replicas.
|
||||
# 1/ Add nodes and edges to MAIN, then delete the edges.
|
||||
# 2/ Check state of replicas.
|
||||
|
||||
# 0/
|
||||
conn = connection(7687, "main")
|
||||
conn.autocommit = True
|
||||
cursor = conn.cursor()
|
||||
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
|
||||
|
||||
expected_data = {
|
||||
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
|
||||
("replica_2", "127.0.0.1:10002", "async", 0, 0, "ready"),
|
||||
}
|
||||
assert actual_data == expected_data
|
||||
|
||||
# 1/
|
||||
execute_and_fetch_all(cursor, "CREATE (a)-[r:X]->(b) CREATE (a)-[:X]->(b) DELETE r;")
|
||||
|
||||
# 2/
|
||||
expected_data = {
|
||||
("replica_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
|
||||
("replica_2", "127.0.0.1:10002", "async", 2, 0, "ready"),
|
||||
}
|
||||
|
||||
def retrieve_data():
|
||||
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
|
||||
|
||||
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
|
||||
assert actual_data == expected_data
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
@ -8,6 +8,23 @@ template_validation_queries: &template_validation_queries
|
||||
validation_queries:
|
||||
- <<: *template_test_nodes_query
|
||||
- <<: *template_test_edges_query
|
||||
template_simple_cluster: &template_simple_cluster
|
||||
cluster:
|
||||
replica_1:
|
||||
args: [ "--bolt-port", "7688", "--log-level=TRACE" ]
|
||||
log_file: "replication-e2e-replica1.log"
|
||||
setup_queries: [ "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;" ]
|
||||
replica_2:
|
||||
args: ["--bolt-port", "7689", "--log-level=TRACE"]
|
||||
log_file: "replication-e2e-replica2.log"
|
||||
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"]
|
||||
main:
|
||||
args: ["--bolt-port", "7687", "--log-level=TRACE"]
|
||||
log_file: "replication-e2e-main.log"
|
||||
setup_queries: [
|
||||
"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001'",
|
||||
"REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:10002'",
|
||||
]
|
||||
template_cluster: &template_cluster
|
||||
cluster:
|
||||
replica_1:
|
||||
@ -83,3 +100,8 @@ workloads:
|
||||
- name: "Show while creating invalid state"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["replication/show_while_creating_invalid_state.py"]
|
||||
|
||||
- name: "Delete edge replication"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["replication/edge_delete.py"]
|
||||
<<: *template_simple_cluster
|
||||
|
@ -699,3 +699,75 @@ Feature: Match
|
||||
Then the result should be
|
||||
| date(n.time) |
|
||||
| 2021-10-05 |
|
||||
|
||||
Scenario: Variable expand with filter by size of accumulated path
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:Person {id: 1})-[:KNOWS]->(:Person {id: 2})-[:KNOWS]->(:Person {id: 3})-[:KNOWS]->(:Person {id: 4});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path = (:Person {id: 1})-[* (e, n, p | size(p) < 4)]->(:Person {id: 4}) RETURN path
|
||||
"""
|
||||
Then the result should be
|
||||
| path |
|
||||
| <(:Person{id:1})-[:KNOWS]->(:Person{id:2})-[:KNOWS]->(:Person{id:3})-[:KNOWS]->(:Person{id:4})> |
|
||||
|
||||
Scenario: Variable expand with filter by last edge type of accumulated path
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:Person {id: 1})-[:KNOWS]->(:Person {id: 2})-[:KNOWS]->(:Person {id: 3})-[:KNOWS]->(:Person {id: 4});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path = (:Person {id: 1})-[* (e, n, p | type(relationships(p)[-1]) = 'KNOWS')]->(:Person {id: 4}) RETURN path
|
||||
"""
|
||||
Then the result should be
|
||||
| path |
|
||||
| <(:Person{id:1})-[:KNOWS]->(:Person{id:2})-[:KNOWS]->(:Person{id:3})-[:KNOWS]->(:Person{id:4})> |
|
||||
|
||||
Scenario: Variable expand with too restricted filter by size of accumulated path
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:Person {id: 1})-[:KNOWS]->(:Person {id: 2})-[:KNOWS]->(:Person {id: 3})-[:KNOWS]->(:Person {id: 4});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path = (:Person {id: 1})-[* (e, n, p | size(p) < 3)]->(:Person {id: 4}) RETURN path
|
||||
"""
|
||||
Then the result should be empty
|
||||
|
||||
Scenario: Variable expand with too restricted filter by last edge type of accumulated path
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:Person {id: 1})-[:KNOWS]->(:Person {id: 2})-[:KNOWS]->(:Person {id: 3})-[:KNOWS]->(:Person {id: 4});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path = (:Person {id: 1})-[* (e, n, p | type(relationships(p)[-1]) = 'Invalid')]->(:Person {id: 4}) RETURN path
|
||||
"""
|
||||
Then the result should be empty
|
||||
|
||||
Scenario: Test DFS variable expand with filter by edge type1
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[* (e, n, p | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1'))]->(:label3) RETURN path;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path |
|
||||
| <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> |
|
||||
|
||||
Scenario: Test DFS variable expand with filter by edge type2
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[* (e, n, p | NOT(type(e)='type2' AND type(last(relationships(p))) = 'type2'))]->(:label3) RETURN path;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path |
|
||||
| <(:label1 {id: 1})-[:type1 {id: 1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})> |
|
||||
|
@ -203,3 +203,103 @@ Feature: All Shortest Path
|
||||
Then the result should be:
|
||||
| total_cost |
|
||||
| 20.3 |
|
||||
|
||||
Scenario: Test match AllShortest with accumulated path filtered by order of ids
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})-[:type1 {id: 3}]->(:label4 {id: 4});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH pth=(:label1)-[*ALLSHORTEST (r, n | r.id) total_weight (e,n,p | e.id > 0 and (nodes(p)[-1]).id > (nodes(p)[-2]).id)]->(:label4) RETURN pth, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| pth | total_weight |
|
||||
| <(:label1{id:1})-[:type1{id:1}]->(:label2{id:2})-[:type1{id:2}]->(:label3{id:3})-[:type1{id:3}]->(:label4{id:4})> | 6 |
|
||||
|
||||
Scenario: Test match AllShortest with accumulated path filtered by edge type1
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*ALLSHORTEST (r, n | r.id) total_weight (e, n, p | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1'))]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 10 |
|
||||
|
||||
Scenario: Test match AllShortest with accumulated path filtered by edge type2
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*ALLSHORTEST (r, n | r.id) total_weight (e, n, p | NOT(type(e)='type2' AND type(last(relationships(p))) = 'type2'))]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:label1 {id: 1})-[:type1 {id: 1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})> | 3 |
|
||||
|
||||
Scenario: Test match AllShortest with accumulated path filtered by edge type1 and accumulated weight based on edge
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*ALLSHORTEST (r, n | r.id) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w > 0)]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 10 |
|
||||
|
||||
Scenario: Test match AllShortest with accumulated path filtered by edge type1 and accumulated weight based on edge too restricted
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*ALLSHORTEST (r, n | r.id) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w < 10)]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be empty
|
||||
|
||||
Scenario: Test match AllShortest with accumulated path filtered by edge type1 and accumulated weight based on vertex is int
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*ALLSHORTEST (r, n | n.id) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w > 0)]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 4 |
|
||||
|
||||
Scenario: Test match allShortest with accumulated path filtered by edge type1 and accumulated weight based on vertex and edge are ints
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*ALLSHORTEST (r, n | n.id + coalesce(r.id, 0)) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w > 0)]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 14 |
|
||||
|
||||
Scenario: Test match AllShortest with accumulated path filtered by edge type1 and accumulated weight based on vertex and edge are doubles
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:label1 {id: 1})-[:type1 {id:1.5}]->(:label2 {id: 2})-[:type1 {id: 2.1}]->(:label3 {id: 3})-[:type1 {id: 3.4}]->(:label4 {id: 4});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*ALLSHORTEST (r, n | n.id + coalesce(r.id, 0)) total_weight (e, n, p, w | w > 0)]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:label1 {id: 1})-[:type1 {id: 1.5}]->(:label2 {id: 2})-[:type1 {id: 2.1}]->(:label3 {id: 3})> | 9.6 |
|
||||
|
||||
Scenario: Test match AllShortest with accumulated path filtered by order of ids and accumulated weight based on both vertex and edge is duration
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:station {name: "A", arrival: localTime("08:00"), departure: localTime("08:15")})-[:ride {id: 1, duration: duration("PT1H5M")}]->(:station {name: "B", arrival: localtime("09:20"), departure: localTime("09:30")})-[:ride {id: 2, duration: duration("PT30M")}]->(:station {name: "C", arrival: localTime("10:00"), departure: localTime("10:20")});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:station {name:"A"})-[*ALLSHORTEST (r, v | v.departure - v.arrival + coalesce(r.duration, duration("PT0M"))) total_weight (r,n,p,w | (nodes(p)[-1]).name > (nodes(p)[-2]).name AND not(w is null))]->(:station {name:"C"}) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:station {arrival: 08:00:00.000000000, departure: 08:15:00.000000000, name: 'A'})-[:ride {duration: PT1H5M, id: 1}]->(:station {arrival: 09:20:00.000000000, departure: 09:30:00.000000000, name: 'B'})-[:ride {duration: PT30M, id: 2}]->(:station {arrival: 10:00:00.000000000, departure: 10:20:00.000000000, name: 'C'})> | PT2H20M |
|
||||
|
@ -121,3 +121,95 @@ Feature: Bfs
|
||||
Then the result should be:
|
||||
| p |
|
||||
| <(:Node {id: 2})-[:LINK {date: '2023-03'}]->(:Node {id: 3})> |
|
||||
|
||||
Scenario: Test BFS variable expand with filter by last edge type of accumulated path
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH pth=(:label1)-[*BFS (e,n,p | type(relationships(p)[-1]) = 'type1')]->(:label3) return pth;
|
||||
"""
|
||||
Then the result should be:
|
||||
| pth |
|
||||
| <(:label1{id:1})-[:type1{id:1}]->(:label2{id:2})-[:type1{id:2}]->(:label3{id:3})> |
|
||||
|
||||
Scenario: Test BFS variable expand with restict filter by last edge type of accumulated path
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH pth=(:label1)-[*BFS (e,n,p | type(relationships(p)[-1]) = 'type2')]->(:label2) return pth;
|
||||
"""
|
||||
Then the result should be empty
|
||||
|
||||
Scenario: Test BFS variable expand with filter by size of accumulated path
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH pth=(:label1)-[*BFS (e,n,p | size(p) < 3)]->(:label3) return pth;
|
||||
"""
|
||||
Then the result should be:
|
||||
| pth |
|
||||
| <(:label1{id:1})-[:type1{id:1}]->(:label2{id:2})-[:type1{id:2}]->(:label3{id:3})> |
|
||||
|
||||
Scenario: Test BFS variable expand with restict filter by size of accumulated path
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH pth=(:label1)-[*BFS (e,n,p | size(p) < 2)]->(:label3) return pth;
|
||||
"""
|
||||
Then the result should be empty
|
||||
|
||||
Scenario: Test BFS variable expand with filter by order of ids in accumulated path when target vertex is indexed
|
||||
Given graph "graph_index"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH pth=(:label1)-[*BFS (e,n,p | (nodes(p)[-1]).id > (nodes(p)[-2]).id)]->(:label4) return pth;
|
||||
"""
|
||||
Then the result should be:
|
||||
| pth |
|
||||
| <(:label1 {id: 1})-[:type1 {id: 1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})-[:type1 {id: 3}]->(:label4 {id: 4})> |
|
||||
|
||||
Scenario: Test BFS variable expand with filter by order of ids in accumulated path when target vertex is NOT indexed
|
||||
Given graph "graph_index"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH pth=(:label1)-[*BFS (e,n,p | (nodes(p)[-1]).id > (nodes(p)[-2]).id)]->(:label3) return pth;
|
||||
"""
|
||||
Then the result should be:
|
||||
| pth |
|
||||
| <(:label1 {id: 1})-[:type1 {id: 1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})> |
|
||||
|
||||
Scenario: Test BFS variable expand with filter by edge type1
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*BFS (e, n, p | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1'))]->(:label3) RETURN path;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path |
|
||||
| <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> |
|
||||
|
||||
Scenario: Test BFS variable expand with filter by edge type2
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*BFS (e, n, p | NOT(type(e)='type2' AND type(last(relationships(p))) = 'type2'))]->(:label3) RETURN path;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path |
|
||||
| <(:label1 {id: 1})-[:type1 {id: 1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})> |
|
||||
|
@ -155,3 +155,103 @@ Feature: Weighted Shortest Path
|
||||
MATCH (n {a:'0'})-[le *wShortest 10 (e, n | e.w ) w]->(m) RETURN m.a, size(le) as s, w
|
||||
"""
|
||||
Then an error should be raised
|
||||
|
||||
Scenario: Test match wShortest with accumulated path filtered by order of ids
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})-[:type1 {id: 3}]->(:label4 {id: 4});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH pth=(:label1)-[*WSHORTEST (r, n | r.id) total_weight (e,n,p | e.id > 0 and (nodes(p)[-1]).id > (nodes(p)[-2]).id)]->(:label4) RETURN pth, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| pth | total_weight |
|
||||
| <(:label1{id:1})-[:type1{id:1}]->(:label2{id:2})-[:type1{id:2}]->(:label3{id:3})-[:type1{id:3}]->(:label4{id:4})> | 6 |
|
||||
|
||||
Scenario: Test match wShortest with accumulated path filtered by edge type1
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*WSHORTEST (r, n | r.id) total_weight (e, n, p | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1'))]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 10 |
|
||||
|
||||
Scenario: Test match wShortest with accumulated path filtered by edge type2
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*WSHORTEST (r, n | r.id) total_weight (e, n, p | NOT(type(e)='type2' AND type(last(relationships(p))) = 'type2'))]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:label1 {id: 1})-[:type1 {id: 1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})> | 3 |
|
||||
|
||||
Scenario: Test match wShortest with accumulated path filtered by edge type1 and accumulated weight based on edge
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*WSHORTEST (r, n | r.id) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w > 0)]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 10 |
|
||||
|
||||
Scenario: Test match wShortest with accumulated path filtered by edge type1 and accumulated weight based on edge too restricted
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*WSHORTEST (r, n | r.id) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w < 10)]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be empty
|
||||
|
||||
Scenario: Test match wShortest with accumulated path filtered by edge type1 and accumulated weight based on vertex is int
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*WSHORTEST (r, n | n.id) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w > 0)]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 4 |
|
||||
|
||||
Scenario: Test match wShortest with accumulated path filtered by edge type1 and accumulated weight based on vertex and edge are ints
|
||||
Given graph "graph_edges"
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*WSHORTEST (r, n | n.id + coalesce(r.id, 0)) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w > 0)]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 14 |
|
||||
|
||||
Scenario: Test match wShortest with accumulated path filtered by edge type1 and accumulated weight based on vertex and edge are doubles
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:label1 {id: 1})-[:type1 {id:1.5}]->(:label2 {id: 2})-[:type1 {id: 2.1}]->(:label3 {id: 3})-[:type1 {id: 3.4}]->(:label4 {id: 4});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:label1)-[*WSHORTEST (r, n | n.id + coalesce(r.id, 0)) total_weight (e, n, p, w | w > 0)]->(:label3) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:label1 {id: 1})-[:type1 {id: 1.5}]->(:label2 {id: 2})-[:type1 {id: 2.1}]->(:label3 {id: 3})> | 9.6 |
|
||||
|
||||
Scenario: Test match wShortest with accumulated path filtered by order of ids and accumulated weight based on both vertex and edge is duration
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
"""
|
||||
CREATE (:station {name: "A", arrival: localTime("08:00"), departure: localTime("08:15")})-[:ride {id: 1, duration: duration("PT1H5M")}]->(:station {name: "B", arrival: localtime("09:20"), departure: localTime("09:30")})-[:ride {id: 2, duration: duration("PT30M")}]->(:station {name: "C", arrival: localTime("10:00"), departure: localTime("10:20")});
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH path=(:station {name:"A"})-[*WSHORTEST (r, v | v.departure - v.arrival + coalesce(r.duration, duration("PT0M"))) total_weight (r,n,p,w | (nodes(p)[-1]).name > (nodes(p)[-2]).name AND not(w is null))]->(:station {name:"C"}) RETURN path, total_weight;
|
||||
"""
|
||||
Then the result should be:
|
||||
| path | total_weight |
|
||||
| <(:station {arrival: 08:00:00.000000000, departure: 08:15:00.000000000, name: 'A'})-[:ride {duration: PT1H5M, id: 1}]->(:station {arrival: 09:20:00.000000000, departure: 09:30:00.000000000, name: 'B'})-[:ride {duration: PT30M, id: 2}]->(:station {arrival: 10:00:00.000000000, departure: 10:20:00.000000000, name: 'C'})> | PT2H20M |
|
||||
|
@ -0,0 +1,2 @@
|
||||
CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})-[:type1 {id: 3}]->(:label4 {id: 4});
|
||||
MATCH (n :label1), (m :label3) CREATE (n)-[:type2 {id: 10}]->(m);
|
@ -0,0 +1,2 @@
|
||||
CREATE INDEX ON :label4;
|
||||
CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})-[:type1 {id:3}]->(:label4 {id: 4});
|
@ -1925,6 +1925,41 @@ TEST_P(CypherMainVisitorTest, MatchBfsReturn) {
|
||||
ASSERT_TRUE(eq);
|
||||
}
|
||||
|
||||
TEST_P(CypherMainVisitorTest, MatchBfsFilterByPathReturn) {
|
||||
auto &ast_generator = *GetParam();
|
||||
{
|
||||
const auto *query = dynamic_cast<CypherQuery *>(
|
||||
ast_generator.ParseQuery("MATCH pth=(r:type1 {id: 1})<-[*BFS ..10 (e, n, p | startNode(relationships(e)[-1]) = "
|
||||
"c:type2)]->(:type3 {id: 3}) RETURN pth;"));
|
||||
ASSERT_TRUE(query);
|
||||
ASSERT_TRUE(query->single_query_);
|
||||
const auto *match = dynamic_cast<Match *>(query->single_query_->clauses_[0]);
|
||||
ASSERT_TRUE(match);
|
||||
ASSERT_EQ(match->patterns_.size(), 1U);
|
||||
ASSERT_EQ(match->patterns_[0]->atoms_.size(), 3U);
|
||||
auto *bfs = dynamic_cast<EdgeAtom *>(match->patterns_[0]->atoms_[1]);
|
||||
ASSERT_TRUE(bfs);
|
||||
EXPECT_TRUE(bfs->IsVariable());
|
||||
EXPECT_EQ(bfs->filter_lambda_.inner_edge->name_, "e");
|
||||
EXPECT_TRUE(bfs->filter_lambda_.inner_edge->user_declared_);
|
||||
EXPECT_EQ(bfs->filter_lambda_.inner_node->name_, "n");
|
||||
EXPECT_TRUE(bfs->filter_lambda_.inner_node->user_declared_);
|
||||
EXPECT_EQ(bfs->filter_lambda_.accumulated_path->name_, "p");
|
||||
EXPECT_TRUE(bfs->filter_lambda_.accumulated_path->user_declared_);
|
||||
EXPECT_EQ(bfs->filter_lambda_.accumulated_weight, nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(CypherMainVisitorTest, SemanticExceptionOnBfsFilterByWeight) {
|
||||
auto &ast_generator = *GetParam();
|
||||
{
|
||||
ASSERT_THROW(ast_generator.ParseQuery(
|
||||
"MATCH pth=(:type1 {id: 1})<-[*BFS ..10 (e, n, p, w | startNode(relationships(e)[-1] AND w > 0) = "
|
||||
"c:type2)]->(:type3 {id: 3}) RETURN pth;"),
|
||||
SemanticException);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(CypherMainVisitorTest, MatchVariableLambdaSymbols) {
|
||||
auto &ast_generator = *GetParam();
|
||||
auto *query = dynamic_cast<CypherQuery *>(ast_generator.ParseQuery("MATCH () -[*]- () RETURN *"));
|
||||
@ -1981,6 +2016,57 @@ TEST_P(CypherMainVisitorTest, MatchWShortestReturn) {
|
||||
EXPECT_TRUE(shortest->total_weight_->user_declared_);
|
||||
}
|
||||
|
||||
TEST_P(CypherMainVisitorTest, MatchWShortestFilterByPathReturn) {
|
||||
auto &ast_generator = *GetParam();
|
||||
{
|
||||
const auto *query = dynamic_cast<CypherQuery *>(
|
||||
ast_generator.ParseQuery("MATCH pth=()-[r:type1 *wShortest 10 (we, wn | 42) total_weight "
|
||||
"(e, n, p | startNode(relationships(e)[-1]) = c:type3)]->(:type2) RETURN pth"));
|
||||
ASSERT_TRUE(query);
|
||||
ASSERT_TRUE(query->single_query_);
|
||||
const auto *match = dynamic_cast<Match *>(query->single_query_->clauses_[0]);
|
||||
ASSERT_TRUE(match);
|
||||
ASSERT_EQ(match->patterns_.size(), 1U);
|
||||
ASSERT_EQ(match->patterns_[0]->atoms_.size(), 3U);
|
||||
auto *shortestPath = dynamic_cast<EdgeAtom *>(match->patterns_[0]->atoms_[1]);
|
||||
ASSERT_TRUE(shortestPath);
|
||||
EXPECT_TRUE(shortestPath->IsVariable());
|
||||
EXPECT_EQ(shortestPath->filter_lambda_.inner_edge->name_, "e");
|
||||
EXPECT_TRUE(shortestPath->filter_lambda_.inner_edge->user_declared_);
|
||||
EXPECT_EQ(shortestPath->filter_lambda_.inner_node->name_, "n");
|
||||
EXPECT_TRUE(shortestPath->filter_lambda_.inner_node->user_declared_);
|
||||
EXPECT_EQ(shortestPath->filter_lambda_.accumulated_path->name_, "p");
|
||||
EXPECT_TRUE(shortestPath->filter_lambda_.accumulated_path->user_declared_);
|
||||
EXPECT_EQ(shortestPath->filter_lambda_.accumulated_weight, nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(CypherMainVisitorTest, MatchWShortestFilterByPathWeightReturn) {
|
||||
auto &ast_generator = *GetParam();
|
||||
{
|
||||
const auto *query = dynamic_cast<CypherQuery *>(ast_generator.ParseQuery(
|
||||
"MATCH pth=()-[r:type1 *wShortest 10 (we, wn | 42) total_weight "
|
||||
"(e, n, p, w | startNode(relationships(e)[-1]) = c:type3 AND w < 50)]->(:type2) RETURN pth"));
|
||||
ASSERT_TRUE(query);
|
||||
ASSERT_TRUE(query->single_query_);
|
||||
const auto *match = dynamic_cast<Match *>(query->single_query_->clauses_[0]);
|
||||
ASSERT_TRUE(match);
|
||||
ASSERT_EQ(match->patterns_.size(), 1U);
|
||||
ASSERT_EQ(match->patterns_[0]->atoms_.size(), 3U);
|
||||
auto *shortestPath = dynamic_cast<EdgeAtom *>(match->patterns_[0]->atoms_[1]);
|
||||
ASSERT_TRUE(shortestPath);
|
||||
EXPECT_TRUE(shortestPath->IsVariable());
|
||||
EXPECT_EQ(shortestPath->filter_lambda_.inner_edge->name_, "e");
|
||||
EXPECT_TRUE(shortestPath->filter_lambda_.inner_edge->user_declared_);
|
||||
EXPECT_EQ(shortestPath->filter_lambda_.inner_node->name_, "n");
|
||||
EXPECT_TRUE(shortestPath->filter_lambda_.inner_node->user_declared_);
|
||||
EXPECT_EQ(shortestPath->filter_lambda_.accumulated_path->name_, "p");
|
||||
EXPECT_TRUE(shortestPath->filter_lambda_.accumulated_path->user_declared_);
|
||||
EXPECT_EQ(shortestPath->filter_lambda_.accumulated_weight->name_, "w");
|
||||
EXPECT_TRUE(shortestPath->filter_lambda_.accumulated_weight->user_declared_);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(CypherMainVisitorTest, MatchWShortestNoFilterReturn) {
|
||||
auto &ast_generator = *GetParam();
|
||||
auto *query =
|
||||
|
Loading…
Reference in New Issue
Block a user