Compare commits

...

19 Commits

Author SHA1 Message Date
gvolfing
61b76044c1 Merge branch 'master' into Fetch-edge-by-edge-id 2024-03-25 08:21:16 +01:00
gvolfing
0a9d6475c0 Merge branch 'master' into Fetch-edge-by-edge-id 2024-03-22 16:01:13 +01:00
gvolfing
878b6cfe4f Merge branch 'master' into Fetch-edge-by-edge-id 2024-03-22 11:42:02 +01:00
gvolfing
6b7e1d8a9f Assert only if metadata flag is enabled 2024-03-22 11:41:02 +01:00
gvolfing
6f18117b0a Merge branch 'master' into Fetch-edge-by-edge-id 2024-03-20 11:39:36 +01:00
gvolfing
da1189b960 Merge branch 'master' into Fetch-edge-by-edge-id 2024-03-19 16:21:50 +01:00
gvolfing
494b4d2d83 Undo outcommented tests 2024-03-19 16:21:16 +01:00
gvolfing
527d4c864f React to PR comments 2024-03-19 16:18:23 +01:00
gvolfing
71fdace31d Generic refactor 2024-03-18 12:52:31 +01:00
gvolfing
9dff306782 Merge branch 'master' into Fetch-edge-by-edge-id 2024-03-18 11:10:21 +01:00
gvolfing
5d233c2486 Conform cpp-api when changing edges on the fly 2024-03-18 11:02:43 +01:00
gvolfing
db6b1fc265 Implement edge-visibility checking 2024-03-18 09:48:00 +01:00
gvolfing
9825bdb41b Add snapshot based recovery capabilites 2024-03-15 15:59:01 +01:00
gvolfing
4f6d8c10ec Add metadata flag + runtime check during id scan 2024-03-15 13:56:43 +01:00
gvolfing
96651170a0 Add query plan test 2024-03-12 08:50:51 +01:00
gvolfing
a5ee0fad08 Enhance planner for edge-id indexing 2024-03-11 19:39:32 +01:00
gvolfing
956705293e Add logical operator for id based edge scans 2024-03-11 09:56:00 +01:00
gvolfing
5167c6ee37 Add temporary edge retrieval code 2024-03-10 19:39:30 +01:00
gvolfing
a58f2887b3 Create storage level call-chain 2024-03-10 18:56:15 +01:00
34 changed files with 490 additions and 87 deletions

View File

@ -119,6 +119,10 @@ modifications:
value: "false" value: "false"
override: true override: true
- name: "storage_enable_edges_metadata"
value: "false"
override: true
- name: "query_callable_mappings_path" - name: "query_callable_mappings_path"
value: "/etc/memgraph/apoc_compatibility_mappings.json" value: "/etc/memgraph/apoc_compatibility_mappings.json"
override: true override: true

View File

@ -284,8 +284,8 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle
try { try {
spdlog::debug("Loading snapshot"); spdlog::debug("Loading snapshot");
auto recovered_snapshot = storage::durability::LoadSnapshot( auto recovered_snapshot = storage::durability::LoadSnapshot(
*maybe_snapshot_path, &storage->vertices_, &storage->edges_, &storage->repl_storage_state_.history, *maybe_snapshot_path, &storage->vertices_, &storage->edges_, &storage->edges_metadata_,
storage->name_id_mapper_.get(), &storage->edge_count_, storage->config_); &storage->repl_storage_state_.history, storage->name_id_mapper_.get(), &storage->edge_count_, storage->config_);
spdlog::debug("Snapshot loaded successfully"); spdlog::debug("Snapshot loaded successfully");
// If this step is present it should always be the first step of // If this step is present it should always be the first step of
// the recovery so we use the UUID we read from snasphost // the recovery so we use the UUID we read from snasphost

View File

@ -131,6 +131,11 @@ DEFINE_uint64(storage_recovery_thread_count,
DEFINE_bool(storage_enable_schema_metadata, false, DEFINE_bool(storage_enable_schema_metadata, false,
"Controls whether metadata should be collected about the resident labels and edge types."); "Controls whether metadata should be collected about the resident labels and edge types.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(storage_enable_edges_metadata, false,
"Controls whether additional metadata should be stored about the edges in order to do faster traversals on "
"certain queries.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(storage_delta_on_identical_property_update, true, DEFINE_bool(storage_delta_on_identical_property_update, true,
"Controls whether updating a property with the same value should create a delta object."); "Controls whether updating a property with the same value should create a delta object.");

View File

@ -85,6 +85,8 @@ DECLARE_uint64(storage_recovery_thread_count);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(storage_enable_schema_metadata); DECLARE_bool(storage_enable_schema_metadata);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(storage_enable_edges_metadata);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(storage_delta_on_identical_property_update); DECLARE_bool(storage_delta_on_identical_property_update);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -332,9 +332,16 @@ int main(int argc, char **argv) {
.durability_directory = FLAGS_data_directory + "/rocksdb_durability", .durability_directory = FLAGS_data_directory + "/rocksdb_durability",
.wal_directory = FLAGS_data_directory + "/rocksdb_wal"}, .wal_directory = FLAGS_data_directory + "/rocksdb_wal"},
.salient.items = {.properties_on_edges = FLAGS_storage_properties_on_edges, .salient.items = {.properties_on_edges = FLAGS_storage_properties_on_edges,
.enable_edges_metadata =
FLAGS_storage_properties_on_edges ? FLAGS_storage_enable_edges_metadata : false,
.enable_schema_metadata = FLAGS_storage_enable_schema_metadata, .enable_schema_metadata = FLAGS_storage_enable_schema_metadata,
.delta_on_identical_property_update = FLAGS_storage_delta_on_identical_property_update}, .delta_on_identical_property_update = FLAGS_storage_delta_on_identical_property_update},
.salient.storage_mode = memgraph::flags::ParseStorageMode()}; .salient.storage_mode = memgraph::flags::ParseStorageMode()};
if (!FLAGS_storage_properties_on_edges && FLAGS_storage_enable_edges_metadata) {
spdlog::warn(
"Properties on edges were not enabled, hence edges metadata will also be disabled. If you wish to utilize "
"extra metadata on edges, enable properties on edges as well.");
}
spdlog::info("config recover on startup {}, flags {} {}", db_config.durability.recover_on_startup, spdlog::info("config recover on startup {}, flags {} {}", db_config.durability.recover_on_startup,
FLAGS_storage_recover_on_startup, FLAGS_data_recovery_on_startup); FLAGS_storage_recover_on_startup, FLAGS_data_recovery_on_startup);
memgraph::utils::Scheduler jemalloc_purge_scheduler; memgraph::utils::Scheduler jemalloc_purge_scheduler;

View File

@ -716,8 +716,7 @@ int main(int argc, char *argv[]) {
.recover_on_startup = false, .recover_on_startup = false,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::DISABLED, .snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::DISABLED,
.snapshot_on_exit = true}, .snapshot_on_exit = true},
.salient = {.items = {.properties_on_edges = FLAGS_storage_properties_on_edges}}, .salient = {.items = {.properties_on_edges = FLAGS_storage_properties_on_edges}}};
};
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)}; memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
auto store = memgraph::dbms::CreateInMemoryStorage(config, repl_state); auto store = memgraph::dbms::CreateInMemoryStorage(config, repl_state);

View File

@ -439,6 +439,12 @@ class DbAccessor final {
return std::nullopt; return std::nullopt;
} }
std::optional<EdgeAccessor> FindEdge(storage::Gid gid, storage::View view) {
auto maybe_edge = accessor_->FindEdge(gid, view);
if (maybe_edge) return EdgeAccessor(*maybe_edge);
return std::nullopt;
}
void FinalizeTransaction() { accessor_->FinalizeTransaction(); } void FinalizeTransaction() { accessor_->FinalizeTransaction(); }
void TrackCurrentThreadAllocations() { void TrackCurrentThreadAllocations() {
@ -813,6 +819,8 @@ class SubgraphDbAccessor final {
std::optional<VertexAccessor> FindVertex(storage::Gid gid, storage::View view); std::optional<VertexAccessor> FindVertex(storage::Gid gid, storage::View view);
std::optional<EdgeAccessor> FindEdge(storage::Gid gid, storage::View view);
Graph *getGraph(); Graph *getGraph();
storage::StorageMode GetStorageMode() const noexcept; storage::StorageMode GetStorageMode() const noexcept;

View File

@ -117,6 +117,9 @@ class PlanHintsProvider final : public HierarchicalLogicalOperatorVisitor {
bool PreVisit(ScanAllByEdgeType & /*unused*/) override { return true; } bool PreVisit(ScanAllByEdgeType & /*unused*/) override { return true; }
bool PostVisit(ScanAllByEdgeType & /*unused*/) override { return true; } bool PostVisit(ScanAllByEdgeType & /*unused*/) override { return true; }
bool PreVisit(ScanAllByEdgeId & /*unused*/) override { return true; }
bool PostVisit(ScanAllByEdgeId & /*unused*/) override { return true; }
bool PreVisit(ConstructNamedPath & /*unused*/) override { return true; } bool PreVisit(ConstructNamedPath & /*unused*/) override { return true; }
bool PostVisit(ConstructNamedPath & /*unused*/) override { return true; } bool PostVisit(ConstructNamedPath & /*unused*/) override { return true; }

View File

@ -110,6 +110,7 @@ extern const Event ScanAllByLabelPropertyValueOperator;
extern const Event ScanAllByLabelPropertyOperator; extern const Event ScanAllByLabelPropertyOperator;
extern const Event ScanAllByIdOperator; extern const Event ScanAllByIdOperator;
extern const Event ScanAllByEdgeTypeOperator; extern const Event ScanAllByEdgeTypeOperator;
extern const Event ScanAllByEdgeIdOperator;
extern const Event ExpandOperator; extern const Event ExpandOperator;
extern const Event ExpandVariableOperator; extern const Event ExpandVariableOperator;
extern const Event ConstructNamedPathOperator; extern const Event ConstructNamedPathOperator;
@ -544,7 +545,7 @@ class ScanAllCursor : public Cursor {
template <typename TEdgesFun> template <typename TEdgesFun>
class ScanAllByEdgeTypeCursor : public Cursor { class ScanAllByEdgeTypeCursor : public Cursor {
public: public:
explicit ScanAllByEdgeTypeCursor(const ScanAllByEdgeType &self, Symbol output_symbol, UniqueCursorPtr input_cursor, explicit ScanAllByEdgeTypeCursor(const ScanAll &self, Symbol output_symbol, UniqueCursorPtr input_cursor,
storage::View view, TEdgesFun get_edges, const char *op_name) storage::View view, TEdgesFun get_edges, const char *op_name)
: self_(self), : self_(self),
output_symbol_(std::move(output_symbol)), output_symbol_(std::move(output_symbol)),
@ -584,7 +585,7 @@ class ScanAllByEdgeTypeCursor : public Cursor {
} }
private: private:
const ScanAllByEdgeType &self_; const ScanAll &self_;
const Symbol output_symbol_; const Symbol output_symbol_;
const UniqueCursorPtr input_cursor_; const UniqueCursorPtr input_cursor_;
storage::View view_; storage::View view_;
@ -636,10 +637,7 @@ UniqueCursorPtr ScanAllByLabel::MakeCursor(utils::MemoryResource *mem) const {
ScanAllByEdgeType::ScanAllByEdgeType(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, ScanAllByEdgeType::ScanAllByEdgeType(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol,
storage::EdgeTypeId edge_type, storage::View view) storage::EdgeTypeId edge_type, storage::View view)
: input_(input ? input : std::make_shared<Once>()), : ScanAll(input, output_symbol, view), edge_type_(edge_type) {}
output_symbol_(std::move(output_symbol)),
view_(view),
edge_type_(edge_type) {}
ACCEPT_WITH_INPUT(ScanAllByEdgeType) ACCEPT_WITH_INPUT(ScanAllByEdgeType)
@ -805,6 +803,38 @@ UniqueCursorPtr ScanAllById::MakeCursor(utils::MemoryResource *mem) const {
view_, std::move(vertices), "ScanAllById"); view_, std::move(vertices), "ScanAllById");
} }
ScanAllByEdgeId::ScanAllByEdgeId(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol,
Expression *expression, storage::View view)
: ScanAll(input, output_symbol, view), expression_(expression) {
MG_ASSERT(expression);
}
ACCEPT_WITH_INPUT(ScanAllByEdgeId)
UniqueCursorPtr ScanAllByEdgeId::MakeCursor(utils::MemoryResource *mem) const {
memgraph::metrics::IncrementCounter(memgraph::metrics::ScanAllByEdgeIdOperator);
auto edges = [this](Frame &frame, ExecutionContext &context) -> std::optional<std::vector<EdgeAccessor>> {
auto *db = context.db_accessor;
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, view_);
auto value = expression_->Accept(evaluator);
if (!value.IsNumeric()) return std::nullopt;
int64_t id = value.IsInt() ? value.ValueInt() : value.ValueDouble();
if (value.IsDouble() && id != value.ValueDouble()) return std::nullopt;
auto maybe_edge = db->FindEdge(storage::Gid::FromInt(id), view_);
if (!maybe_edge) return std::nullopt;
return std::vector<EdgeAccessor>{*maybe_edge};
};
return MakeUniqueCursorPtr<ScanAllByEdgeTypeCursor<decltype(edges)>>(
mem, *this, output_symbol_, input_->MakeCursor(mem), view_, std::move(edges), "ScanAllByEdgeId");
}
std::vector<Symbol> ScanAllByEdgeId::ModifiedSymbols(const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
symbols.emplace_back(output_symbol_);
return symbols;
}
namespace { namespace {
bool CheckExistingNode(const VertexAccessor &new_node, const Symbol &existing_node_sym, Frame &frame) { bool CheckExistingNode(const VertexAccessor &new_node, const Symbol &existing_node_sym, Frame &frame) {
const TypedValue &existing_node = frame[existing_node_sym]; const TypedValue &existing_node = frame[existing_node_sym];

View File

@ -95,6 +95,7 @@ class ScanAllByLabelPropertyValue;
class ScanAllByLabelProperty; class ScanAllByLabelProperty;
class ScanAllById; class ScanAllById;
class ScanAllByEdgeType; class ScanAllByEdgeType;
class ScanAllByEdgeId;
class Expand; class Expand;
class ExpandVariable; class ExpandVariable;
class ConstructNamedPath; class ConstructNamedPath;
@ -128,13 +129,12 @@ class IndexedJoin;
class HashJoin; class HashJoin;
class RollUpApply; class RollUpApply;
using LogicalOperatorCompositeVisitor = using LogicalOperatorCompositeVisitor = utils::CompositeVisitor<
utils::CompositeVisitor<Once, CreateNode, CreateExpand, ScanAll, ScanAllByLabel, ScanAllByLabelPropertyRange, Once, CreateNode, CreateExpand, ScanAll, ScanAllByLabel, ScanAllByLabelPropertyRange, ScanAllByLabelPropertyValue,
ScanAllByLabelPropertyValue, ScanAllByLabelProperty, ScanAllById, ScanAllByEdgeType, Expand, ScanAllByLabelProperty, ScanAllById, ScanAllByEdgeType, ScanAllByEdgeId, Expand, ExpandVariable, ConstructNamedPath,
ExpandVariable, ConstructNamedPath, Filter, Produce, Delete, SetProperty, SetProperties, Filter, Produce, Delete, SetProperty, SetProperties, SetLabels, RemoveProperty, RemoveLabels, EdgeUniquenessFilter,
SetLabels, RemoveProperty, RemoveLabels, EdgeUniquenessFilter, Accumulate, Aggregate, Skip, Accumulate, Aggregate, Skip, Limit, OrderBy, Merge, Optional, Unwind, Distinct, Union, Cartesian, CallProcedure,
Limit, OrderBy, Merge, Optional, Unwind, Distinct, Union, Cartesian, CallProcedure, LoadCsv, LoadCsv, Foreach, EmptyResult, EvaluatePatternFilter, Apply, IndexedJoin, HashJoin, RollUpApply>;
Foreach, EmptyResult, EvaluatePatternFilter, Apply, IndexedJoin, HashJoin, RollUpApply>;
using LogicalOperatorLeafVisitor = utils::LeafVisitor<Once>; using LogicalOperatorLeafVisitor = utils::LeafVisitor<Once>;
@ -590,7 +590,7 @@ class ScanAllByLabel : public memgraph::query::plan::ScanAll {
} }
}; };
class ScanAllByEdgeType : public memgraph::query::plan::LogicalOperator { class ScanAllByEdgeType : public memgraph::query::plan::ScanAll {
public: public:
static const utils::TypeInfo kType; static const utils::TypeInfo kType;
const utils::TypeInfo &GetTypeInfo() const override { return kType; } const utils::TypeInfo &GetTypeInfo() const override { return kType; }
@ -610,10 +610,6 @@ class ScanAllByEdgeType : public memgraph::query::plan::LogicalOperator {
return fmt::format("ScanAllByEdgeType ({} :{})", output_symbol_.name(), dba_->EdgeTypeToName(edge_type_)); return fmt::format("ScanAllByEdgeType ({} :{})", output_symbol_.name(), dba_->EdgeTypeToName(edge_type_));
} }
std::shared_ptr<memgraph::query::plan::LogicalOperator> input_;
Symbol output_symbol_;
storage::View view_;
storage::EdgeTypeId edge_type_; storage::EdgeTypeId edge_type_;
std::unique_ptr<LogicalOperator> Clone(AstStorage *storage) const override { std::unique_ptr<LogicalOperator> Clone(AstStorage *storage) const override {
@ -816,6 +812,35 @@ class ScanAllById : public memgraph::query::plan::ScanAll {
return object; return object;
} }
}; };
class ScanAllByEdgeId : public memgraph::query::plan::ScanAll {
public:
static const utils::TypeInfo kType;
const utils::TypeInfo &GetTypeInfo() const override { return kType; }
ScanAllByEdgeId() = default;
ScanAllByEdgeId(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, Expression *expression,
storage::View view = storage::View::OLD);
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
bool HasSingleInput() const override { return true; }
std::shared_ptr<LogicalOperator> input() const override { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) override { input_ = input; }
std::string ToString() const override { return fmt::format("ScanAllByEdgeId ({})", output_symbol_.name()); }
Expression *expression_;
std::unique_ptr<LogicalOperator> Clone(AstStorage *storage) const override {
auto object = std::make_unique<ScanAllByEdgeId>();
object->input_ = input_ ? input_->Clone(storage) : nullptr;
object->output_symbol_ = output_symbol_;
object->view_ = view_;
object->expression_ = expression_ ? expression_->Clone(storage) : nullptr;
return object;
}
};
struct ExpandCommon { struct ExpandCommon {
static const utils::TypeInfo kType; static const utils::TypeInfo kType;

View File

@ -49,9 +49,13 @@ constexpr utils::TypeInfo query::plan::ScanAllByLabelProperty::kType{
constexpr utils::TypeInfo query::plan::ScanAllById::kType{utils::TypeId::SCAN_ALL_BY_ID, "ScanAllById", constexpr utils::TypeInfo query::plan::ScanAllById::kType{utils::TypeId::SCAN_ALL_BY_ID, "ScanAllById",
&query::plan::ScanAll::kType}; &query::plan::ScanAll::kType};
constexpr utils::TypeInfo query::plan::ScanAllByEdgeType::kType{utils::TypeId::SCAN_ALL_BY_EDGE_TYPE, constexpr utils::TypeInfo query::plan::ScanAllByEdgeType::kType{utils::TypeId::SCAN_ALL_BY_EDGE_TYPE,
"ScanAllByEdgeType", &query::plan::ScanAll::kType}; "ScanAllByEdgeType", &query::plan::ScanAll::kType};
constexpr utils::TypeInfo query::plan::ScanAllByEdgeId::kType{utils::TypeId::SCAN_ALL_BY_ID, "ScanAllByEdgeId",
&query::plan::ScanAll::kType};
constexpr utils::TypeInfo query::plan::ExpandCommon::kType{utils::TypeId::EXPAND_COMMON, "ExpandCommon", nullptr}; constexpr utils::TypeInfo query::plan::ExpandCommon::kType{utils::TypeId::EXPAND_COMMON, "ExpandCommon", nullptr};
constexpr utils::TypeInfo query::plan::Expand::kType{utils::TypeId::EXPAND, "Expand", constexpr utils::TypeInfo query::plan::Expand::kType{utils::TypeId::EXPAND, "Expand",

View File

@ -83,6 +83,11 @@ bool PlanPrinter::PreVisit(query::plan::ScanAllByEdgeType &op) {
return true; return true;
} }
bool PlanPrinter::PreVisit(query::plan::ScanAllByEdgeId &op) {
WithPrintLn([&op](auto &out) { out << "* " << op.ToString(); });
return true;
}
bool PlanPrinter::PreVisit(query::plan::Expand &op) { bool PlanPrinter::PreVisit(query::plan::Expand &op) {
op.dba_ = dba_; op.dba_ = dba_;
WithPrintLn([&op](auto &out) { out << "* " << op.ToString(); }); WithPrintLn([&op](auto &out) { out << "* " << op.ToString(); });
@ -496,6 +501,16 @@ bool PlanToJsonVisitor::PreVisit(ScanAllByEdgeType &op) {
return false; return false;
} }
bool PlanToJsonVisitor::PreVisit(ScanAllByEdgeId &op) {
json self;
self["name"] = "ScanAllByEdgeId";
self["output_symbol"] = ToJson(op.output_symbol_);
op.input_->Accept(*this);
self["input"] = PopOutput();
output_ = std::move(self);
return false;
}
bool PlanToJsonVisitor::PreVisit(CreateNode &op) { bool PlanToJsonVisitor::PreVisit(CreateNode &op) {
json self; json self;
self["name"] = "CreateNode"; self["name"] = "CreateNode";

View File

@ -68,6 +68,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor {
bool PreVisit(ScanAllByLabelProperty &) override; bool PreVisit(ScanAllByLabelProperty &) override;
bool PreVisit(ScanAllById &) override; bool PreVisit(ScanAllById &) override;
bool PreVisit(ScanAllByEdgeType &) override; bool PreVisit(ScanAllByEdgeType &) override;
bool PreVisit(ScanAllByEdgeId &) override;
bool PreVisit(Expand &) override; bool PreVisit(Expand &) override;
bool PreVisit(ExpandVariable &) override; bool PreVisit(ExpandVariable &) override;
@ -206,6 +207,7 @@ class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor {
bool PreVisit(ScanAllByLabelProperty &) override; bool PreVisit(ScanAllByLabelProperty &) override;
bool PreVisit(ScanAllById &) override; bool PreVisit(ScanAllById &) override;
bool PreVisit(ScanAllByEdgeType &) override; bool PreVisit(ScanAllByEdgeType &) override;
bool PreVisit(ScanAllByEdgeId &) override;
bool PreVisit(EmptyResult &) override; bool PreVisit(EmptyResult &) override;
bool PreVisit(Produce &) override; bool PreVisit(Produce &) override;

View File

@ -48,11 +48,26 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
bool PreVisit(Filter &op) override { bool PreVisit(Filter &op) override {
prev_ops_.push_back(&op); prev_ops_.push_back(&op);
filters_.CollectFilterExpression(op.expression_, *symbol_table_);
return true; return true;
} }
bool PostVisit(Filter & /*op*/) override { bool PostVisit(Filter &op) override {
prev_ops_.pop_back(); prev_ops_.pop_back();
ExpressionRemovalResult removal = RemoveExpressions(op.expression_, filter_exprs_for_removal_);
op.expression_ = removal.trimmed_expression;
if (op.expression_) {
Filters leftover_filters;
leftover_filters.CollectFilterExpression(op.expression_, *symbol_table_);
op.all_filters_ = std::move(leftover_filters);
}
if (!op.expression_) {
SetOnParent(op.input());
}
return true; return true;
} }
@ -70,7 +85,7 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
bool PostVisit(ScanAll &op) override { bool PostVisit(ScanAll &op) override {
prev_ops_.pop_back(); prev_ops_.pop_back();
if (EdgeTypeIndexingPossible()) { if (EdgeTypeIndexingPossible() || maybe_id_lookup_value_) {
SetOnParent(op.input()); SetOnParent(op.input());
} }
@ -88,6 +103,25 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
edge_type_index_exist = only_one_edge_type ? db_->EdgeTypeIndexExists(op.common_.edge_types.front()) : false; edge_type_index_exist = only_one_edge_type ? db_->EdgeTypeIndexExists(op.common_.edge_types.front()) : false;
scanall_under_expand_ = only_one_edge_type && expansion_is_named && expdanded_node_not_named; scanall_under_expand_ = only_one_edge_type && expansion_is_named && expdanded_node_not_named;
const auto &output_symbol = op.common_.edge_symbol;
const auto &modified_symbols = op.ModifiedSymbols(*symbol_table_);
std::unordered_set<Symbol> bound_symbols(modified_symbols.begin(), modified_symbols.end());
auto are_bound = [&bound_symbols](const auto &used_symbols) {
for (const auto &used_symbol : used_symbols) {
if (!utils::Contains(bound_symbols, used_symbol)) {
return false;
}
}
return true;
};
for (const auto &filter : filters_.IdFilters(output_symbol)) {
if (filter.id_filter->is_symbol_in_value_ || !are_bound(filter.used_symbols)) continue;
maybe_id_lookup_value_ = filter.id_filter->value_;
filter_exprs_for_removal_.insert(filter.expression);
filters_.EraseFilter(filter);
}
} }
return true; return true;
@ -96,9 +130,10 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
bool PostVisit(Expand &op) override { bool PostVisit(Expand &op) override {
prev_ops_.pop_back(); prev_ops_.pop_back();
if (EdgeTypeIndexingPossible()) { if (EdgeTypeIndexingPossible() || maybe_id_lookup_value_) {
auto indexed_scan = GenEdgeTypeScan(op); auto indexed_scan = GenEdgeTypeScan(op);
SetOnParent(std::move(indexed_scan)); SetOnParent(std::move(indexed_scan));
return true;
} }
return true; return true;
@ -254,6 +289,15 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
return true; return true;
} }
bool PreVisit(ScanAllByEdgeId &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(ScanAllByEdgeId &) override {
prev_ops_.pop_back();
return true;
}
bool PreVisit(ConstructNamedPath &op) override { bool PreVisit(ConstructNamedPath &op) override {
prev_ops_.push_back(&op); prev_ops_.push_back(&op);
return true; return true;
@ -491,9 +535,12 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
std::vector<LogicalOperator *> prev_ops_; std::vector<LogicalOperator *> prev_ops_;
std::unordered_set<Symbol> cartesian_symbols_; std::unordered_set<Symbol> cartesian_symbols_;
memgraph::query::Expression *maybe_id_lookup_value_ = nullptr;
bool EdgeTypeIndexingPossible() const { bool EdgeTypeIndexingPossible() const {
return expand_under_produce_ && scanall_under_expand_ && once_under_scanall_ && edge_type_index_exist; return expand_under_produce_ && scanall_under_expand_ && once_under_scanall_ && edge_type_index_exist;
} }
bool expand_under_produce_ = false; bool expand_under_produce_ = false;
bool scanall_under_expand_ = false; bool scanall_under_expand_ = false;
bool once_under_scanall_ = false; bool once_under_scanall_ = false;
@ -503,16 +550,23 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
throw utils::NotYetImplemented("Operator not yet covered by EdgeTypeIndexRewriter"); throw utils::NotYetImplemented("Operator not yet covered by EdgeTypeIndexRewriter");
} }
std::unique_ptr<ScanAllByEdgeType> GenEdgeTypeScan(const Expand &expand) { std::unique_ptr<ScanAll> GenEdgeTypeScan(const Expand &expand) {
const auto &input = expand.input(); const auto &input = expand.input();
const auto &output_symbol = expand.common_.edge_symbol; const auto &output_symbol = expand.common_.edge_symbol;
const auto &view = expand.view_; const auto &view = expand.view_;
// Extract edge_type from symbol if (EdgeTypeIndexingPossible()) {
auto edge_type = expand.common_.edge_types.front(); auto edge_type = expand.common_.edge_types.front();
return std::make_unique<ScanAllByEdgeType>(input, output_symbol, edge_type, view); return std::make_unique<ScanAllByEdgeType>(input, output_symbol, edge_type, view);
} }
if (maybe_id_lookup_value_) {
return std::make_unique<ScanAllByEdgeId>(input, output_symbol, maybe_id_lookup_value_, view);
}
LOG_FATAL("Fatal error while rewriting query plan.");
}
void SetOnParent(const std::shared_ptr<LogicalOperator> &input) { void SetOnParent(const std::shared_ptr<LogicalOperator> &input) {
MG_ASSERT(input); MG_ASSERT(input);
if (prev_ops_.empty()) { if (prev_ops_.empty()) {

View File

@ -36,6 +36,7 @@ struct SalientConfig {
StorageMode storage_mode{StorageMode::IN_MEMORY_TRANSACTIONAL}; StorageMode storage_mode{StorageMode::IN_MEMORY_TRANSACTIONAL};
struct Items { struct Items {
bool properties_on_edges{true}; bool properties_on_edges{true};
bool enable_edges_metadata{false};
bool enable_schema_metadata{false}; bool enable_schema_metadata{false};
bool delta_on_identical_property_update{true}; bool delta_on_identical_property_update{true};
friend bool operator==(const Items &lrh, const Items &rhs) = default; friend bool operator==(const Items &lrh, const Items &rhs) = default;
@ -46,11 +47,13 @@ struct SalientConfig {
inline void to_json(nlohmann::json &data, SalientConfig::Items const &items) { inline void to_json(nlohmann::json &data, SalientConfig::Items const &items) {
data = nlohmann::json{{"properties_on_edges", items.properties_on_edges}, data = nlohmann::json{{"properties_on_edges", items.properties_on_edges},
{"enable_edges_metadata", items.enable_edges_metadata},
{"enable_schema_metadata", items.enable_schema_metadata}}; {"enable_schema_metadata", items.enable_schema_metadata}};
} }
inline void from_json(const nlohmann::json &data, SalientConfig::Items &items) { inline void from_json(const nlohmann::json &data, SalientConfig::Items &items) {
data.at("properties_on_edges").get_to(items.properties_on_edges); data.at("properties_on_edges").get_to(items.properties_on_edges);
data.at("enable_edges_metadata").get_to(items.enable_edges_metadata);
data.at("enable_schema_metadata").get_to(items.enable_schema_metadata); data.at("enable_schema_metadata").get_to(items.enable_schema_metadata);
} }

View File

@ -916,6 +916,10 @@ std::optional<VertexAccessor> DiskStorage::DiskAccessor::FindVertex(storage::Gid
return disk_storage->FindVertex(gid, &transaction_, view); return disk_storage->FindVertex(gid, &transaction_, view);
} }
std::optional<EdgeAccessor> DiskStorage::DiskAccessor::FindEdge(storage::Gid gid, View view) {
throw utils::NotYetImplemented("Id based lookup for on-disk storage mode is not yet implemented on edges.");
}
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>> Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>>
DiskStorage::DiskAccessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges, DiskStorage::DiskAccessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges,
bool detach) { bool detach) {

View File

@ -72,6 +72,8 @@ class DiskStorage final : public Storage {
const std::optional<utils::Bound<PropertyValue>> &lower_bound, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) override; const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) override;
std::optional<EdgeAccessor> FindEdge(Gid gid, View view) override;
EdgesIterable Edges(EdgeTypeId edge_type, View view) override; EdgesIterable Edges(EdgeTypeId edge_type, View view) override;
uint64_t ApproximateVertexCount() const override; uint64_t ApproximateVertexCount() const override;

View File

@ -300,6 +300,7 @@ std::optional<ParallelizedSchemaCreationInfo> GetParallelExecInfoIndices(const R
std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state, std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges, utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
utils::SkipList<EdgeMetadata> *edges_metadata,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, const Config &config, Indices *indices, Constraints *constraints, const Config &config,
uint64_t *wal_seq_num) { uint64_t *wal_seq_num) {
@ -334,7 +335,8 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
} }
spdlog::info("Starting snapshot recovery from {}.", path); spdlog::info("Starting snapshot recovery from {}.", path);
try { try {
recovered_snapshot = LoadSnapshot(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config); recovered_snapshot =
LoadSnapshot(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count, config);
spdlog::info("Snapshot recovery successful!"); spdlog::info("Snapshot recovery successful!");
break; break;
} catch (const RecoveryFailure &e) { } catch (const RecoveryFailure &e) {

View File

@ -134,6 +134,7 @@ struct Recovery {
/// @throw std::bad_alloc /// @throw std::bad_alloc
std::optional<RecoveryInfo> RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state, std::optional<RecoveryInfo> RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges, utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
utils::SkipList<EdgeMetadata> *edges_metadata,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, const Config &config, Indices *indices, Constraints *constraints, const Config &config,
uint64_t *wal_seq_num); uint64_t *wal_seq_num);

View File

@ -427,6 +427,7 @@ struct LoadPartialConnectivityResult {
template <typename TEdgeTypeFromIdFunc> template <typename TEdgeTypeFromIdFunc>
LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::path &path, LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::path &path,
utils::SkipList<Vertex> &vertices, utils::SkipList<Edge> &edges, utils::SkipList<Vertex> &vertices, utils::SkipList<Edge> &edges,
utils::SkipList<EdgeMetadata> &edges_metadata,
const uint64_t from_offset, const uint64_t vertices_count, const uint64_t from_offset, const uint64_t vertices_count,
const SalientConfig::Items items, const bool snapshot_has_edges, const SalientConfig::Items items, const bool snapshot_has_edges,
TEdgeTypeFromIdFunc get_edge_type_from_id) { TEdgeTypeFromIdFunc get_edge_type_from_id) {
@ -437,6 +438,7 @@ LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::pat
auto vertex_acc = vertices.access(); auto vertex_acc = vertices.access();
auto edge_acc = edges.access(); auto edge_acc = edges.access();
auto edge_metadata_acc = edges_metadata.access();
// Read the first gid to find the necessary iterator in vertices // Read the first gid to find the necessary iterator in vertices
const auto first_vertex_gid = std::invoke([&]() mutable { const auto first_vertex_gid = std::invoke([&]() mutable {
@ -580,6 +582,9 @@ LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::pat
auto [edge, inserted] = edge_acc.insert(Edge{Gid::FromUint(*edge_gid), nullptr}); auto [edge, inserted] = edge_acc.insert(Edge{Gid::FromUint(*edge_gid), nullptr});
edge_ref = EdgeRef(&*edge); edge_ref = EdgeRef(&*edge);
} }
if (items.enable_edges_metadata) {
edge_metadata_acc.insert(EdgeMetadata{Gid::FromUint(*edge_gid), &vertex});
}
} }
vertex.out_edges.emplace_back(get_edge_type_from_id(*edge_type), &*to_vertex, edge_ref); vertex.out_edges.emplace_back(get_edge_type_from_id(*edge_type), &*to_vertex, edge_ref);
// Increment edge count. We only increment the count here because the // Increment edge count. We only increment the count here because the
@ -626,7 +631,7 @@ void RecoverOnMultipleThreads(size_t thread_count, const TFunc &func, const std:
} }
RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices, RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history, std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
SalientConfig::Items items) { SalientConfig::Items items) {
@ -644,6 +649,7 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
if (!success) { if (!success) {
edges->clear(); edges->clear();
vertices->clear(); vertices->clear();
edges_metadata->clear();
epoch_history->clear(); epoch_history->clear();
} }
}); });
@ -1098,7 +1104,7 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
} }
RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices, RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history, std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
const Config &config) { const Config &config) {
@ -1118,6 +1124,7 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils
if (!success) { if (!success) {
edges->clear(); edges->clear();
vertices->clear(); vertices->clear();
edges_metadata->clear();
epoch_history->clear(); epoch_history->clear();
} }
}); });
@ -1226,10 +1233,10 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils
RecoverOnMultipleThreads( RecoverOnMultipleThreads(
config.durability.recovery_thread_count, config.durability.recovery_thread_count,
[path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id, [path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges,
&highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) { &get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items, const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset,
snapshot_has_edges, get_edge_type_from_id); batch.count, items, snapshot_has_edges, get_edge_type_from_id);
edge_count->fetch_add(result.edge_count); edge_count->fetch_add(result.edge_count);
auto known_highest_edge_gid = highest_edge_gid.load(); auto known_highest_edge_gid = highest_edge_gid.load();
while (known_highest_edge_gid < result.highest_edge_id) { while (known_highest_edge_gid < result.highest_edge_id) {
@ -1387,7 +1394,7 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils
} }
RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices, RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history, std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
const Config &config) { const Config &config) {
@ -1407,6 +1414,7 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils
if (!success) { if (!success) {
edges->clear(); edges->clear();
vertices->clear(); vertices->clear();
edges_metadata->clear();
epoch_history->clear(); epoch_history->clear();
} }
}); });
@ -1515,10 +1523,10 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils
RecoverOnMultipleThreads( RecoverOnMultipleThreads(
config.durability.recovery_thread_count, config.durability.recovery_thread_count,
[path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id, [path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges,
&highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) { &get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items, const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset,
snapshot_has_edges, get_edge_type_from_id); batch.count, items, snapshot_has_edges, get_edge_type_from_id);
edge_count->fetch_add(result.edge_count); edge_count->fetch_add(result.edge_count);
auto known_highest_edge_gid = highest_edge_gid.load(); auto known_highest_edge_gid = highest_edge_gid.load();
while (known_highest_edge_gid < result.highest_edge_id) { while (known_highest_edge_gid < result.highest_edge_id) {
@ -1730,7 +1738,7 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils
} }
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices, RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history, std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, const Config &config) { NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, const Config &config) {
RecoveryInfo recovery_info; RecoveryInfo recovery_info;
@ -1742,14 +1750,16 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
if (!IsVersionSupported(*version)) throw RecoveryFailure(fmt::format("Invalid snapshot version {}", *version)); if (!IsVersionSupported(*version)) throw RecoveryFailure(fmt::format("Invalid snapshot version {}", *version));
if (*version == 14U) { if (*version == 14U) {
return LoadSnapshotVersion14(path, vertices, edges, epoch_history, name_id_mapper, edge_count, return LoadSnapshotVersion14(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count,
config.salient.items); config.salient.items);
} }
if (*version == 15U) { if (*version == 15U) {
return LoadSnapshotVersion15(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config); return LoadSnapshotVersion15(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count,
config);
} }
if (*version == 16U) { if (*version == 16U) {
return LoadSnapshotVersion16(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config); return LoadSnapshotVersion16(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count,
config);
} }
// Cleanup of loaded data in case of failure. // Cleanup of loaded data in case of failure.
@ -1758,6 +1768,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
if (!success) { if (!success) {
edges->clear(); edges->clear();
vertices->clear(); vertices->clear();
edges_metadata->clear();
epoch_history->clear(); epoch_history->clear();
} }
}); });
@ -1866,10 +1877,10 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
RecoverOnMultipleThreads( RecoverOnMultipleThreads(
config.durability.recovery_thread_count, config.durability.recovery_thread_count,
[path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id, [path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges,
&highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) { &get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items, const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset,
snapshot_has_edges, get_edge_type_from_id); batch.count, items, snapshot_has_edges, get_edge_type_from_id);
edge_count->fetch_add(result.edge_count); edge_count->fetch_add(result.edge_count);
auto known_highest_edge_gid = highest_edge_gid.load(); auto known_highest_edge_gid = highest_edge_gid.load();
while (known_highest_edge_gid < result.highest_edge_id) { while (known_highest_edge_gid < result.highest_edge_id) {

View File

@ -64,7 +64,7 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path);
/// Function used to load the snapshot data into the storage. /// Function used to load the snapshot data into the storage.
/// @throw RecoveryFailure /// @throw RecoveryFailure
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices, RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history, std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, const Config &config); NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, const Config &config);

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd. // Copyright 2024 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -49,4 +49,18 @@ inline bool operator<(const Edge &first, const Edge &second) { return first.gid
inline bool operator==(const Edge &first, const Gid &second) { return first.gid == second; } inline bool operator==(const Edge &first, const Gid &second) { return first.gid == second; }
inline bool operator<(const Edge &first, const Gid &second) { return first.gid < second; } inline bool operator<(const Edge &first, const Gid &second) { return first.gid < second; }
struct EdgeMetadata {
EdgeMetadata(Gid gid, Vertex *from_vertex) : gid(gid), from_vertex(from_vertex) {}
Gid gid;
Vertex *from_vertex;
};
static_assert(alignof(Edge) >= 8, "The Edge should be aligned to at least 8!");
inline bool operator==(const EdgeMetadata &first, const EdgeMetadata &second) { return first.gid == second.gid; }
inline bool operator<(const EdgeMetadata &first, const EdgeMetadata &second) { return first.gid < second.gid; }
inline bool operator==(const EdgeMetadata &first, const Gid &second) { return first.gid == second; }
inline bool operator<(const EdgeMetadata &first, const Gid &second) { return first.gid < second; }
} // namespace memgraph::storage } // namespace memgraph::storage

View File

@ -16,6 +16,7 @@
#include <tuple> #include <tuple>
#include "storage/v2/delta.hpp" #include "storage/v2/delta.hpp"
#include "storage/v2/edge_info_helpers.hpp"
#include "storage/v2/mvcc.hpp" #include "storage/v2/mvcc.hpp"
#include "storage/v2/property_store.hpp" #include "storage/v2/property_store.hpp"
#include "storage/v2/property_value.hpp" #include "storage/v2/property_value.hpp"
@ -26,6 +27,15 @@
#include "utils/memory_tracker.hpp" #include "utils/memory_tracker.hpp"
namespace memgraph::storage { namespace memgraph::storage {
std::optional<EdgeAccessor> EdgeAccessor::Create(EdgeRef edge, EdgeTypeId edge_type, Vertex *from_vertex,
Vertex *to_vertex, Storage *storage, Transaction *transaction,
View view, bool for_deleted) {
if (!IsEdgeVisible(edge.ptr, transaction, view)) {
return std::nullopt;
}
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage, transaction, for_deleted);
}
bool EdgeAccessor::IsDeleted() const { bool EdgeAccessor::IsDeleted() const {
if (!storage_->config_.salient.items.properties_on_edges) { if (!storage_->config_.salient.items.properties_on_edges) {

View File

@ -44,6 +44,10 @@ class EdgeAccessor final {
transaction_(transaction), transaction_(transaction),
for_deleted_(for_deleted) {} for_deleted_(for_deleted) {}
static std::optional<EdgeAccessor> Create(EdgeRef edge, EdgeTypeId edge_type, Vertex *from_vertex, Vertex *to_vertex,
Storage *storage, Transaction *transaction, View view,
bool for_deleted = false);
bool IsDeleted() const; bool IsDeleted() const;
/// @return true if the object is visible from the current transaction /// @return true if the object is visible from the current transaction

View File

@ -0,0 +1,56 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "storage/v2/delta.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/view.hpp"
#include <shared_mutex>
namespace memgraph::storage {
inline bool IsEdgeVisible(Edge *edge, const Transaction *transaction, View view) {
bool exists = true;
bool deleted = true;
Delta *delta = nullptr;
{
auto guard = std::shared_lock{edge->lock};
deleted = edge->deleted;
delta = edge->delta;
}
ApplyDeltasForRead(transaction, delta, view, [&](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT: {
exists = false;
break;
}
}
});
return exists && !deleted;
}
} // namespace memgraph::storage

View File

@ -12,6 +12,7 @@
#include "storage/v2/inmemory/edge_type_index.hpp" #include "storage/v2/inmemory/edge_type_index.hpp"
#include "storage/v2/constraints/constraints.hpp" #include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/edge_info_helpers.hpp"
#include "storage/v2/indices/indices_utils.hpp" #include "storage/v2/indices/indices_utils.hpp"
#include "utils/counter.hpp" #include "utils/counter.hpp"
@ -25,39 +26,6 @@ using EdgeTypeId = memgraph::storage::EdgeTypeId;
using Transaction = memgraph::storage::Transaction; using Transaction = memgraph::storage::Transaction;
using View = memgraph::storage::View; using View = memgraph::storage::View;
bool IsIndexEntryVisible(Edge *edge, const Transaction *transaction, View view) {
bool exists = true;
bool deleted = true;
Delta *delta = nullptr;
{
auto guard = std::shared_lock{edge->lock};
deleted = edge->deleted;
delta = edge->delta;
}
ApplyDeltasForRead(transaction, delta, view, [&](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT: {
exists = false;
break;
}
}
});
return exists && !deleted;
}
using ReturnType = std::optional<std::tuple<EdgeTypeId, Vertex *, EdgeRef>>; using ReturnType = std::optional<std::tuple<EdgeTypeId, Vertex *, EdgeRef>>;
ReturnType VertexDeletedConnectedEdges(Vertex *vertex, Edge *edge, const Transaction *transaction, View view) { ReturnType VertexDeletedConnectedEdges(Vertex *vertex, Edge *edge, const Transaction *transaction, View view) {
ReturnType link; ReturnType link;
@ -242,7 +210,7 @@ void InMemoryEdgeTypeIndex::Iterable::Iterator::AdvanceUntilValid() {
auto *from_vertex = index_iterator_->from_vertex; auto *from_vertex = index_iterator_->from_vertex;
auto *to_vertex = index_iterator_->to_vertex; auto *to_vertex = index_iterator_->to_vertex;
if (!IsIndexEntryVisible(index_iterator_->edge, self_->transaction_, self_->view_) || from_vertex->deleted || if (!IsEdgeVisible(index_iterator_->edge, self_->transaction_, self_->view_) || from_vertex->deleted ||
to_vertex->deleted) { to_vertex->deleted) {
continue; continue;
} }

View File

@ -104,7 +104,7 @@ InMemoryStorage::InMemoryStorage(Config config)
config_.durability.storage_directory); config_.durability.storage_directory);
} }
if (config_.durability.recover_on_startup) { if (config_.durability.recover_on_startup) {
auto info = recovery_.RecoverData(&uuid_, repl_storage_state_, &vertices_, &edges_, &edge_count_, auto info = recovery_.RecoverData(&uuid_, repl_storage_state_, &vertices_, &edges_, &edges_metadata_, &edge_count_,
name_id_mapper_.get(), &indices_, &constraints_, config_, &wal_seq_num_); name_id_mapper_.get(), &indices_, &constraints_, config_, &wal_seq_num_);
if (info) { if (info) {
vertex_id_ = info->next_vertex_id; vertex_id_ = info->next_vertex_id;
@ -341,6 +341,11 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
if (delta) { if (delta) {
delta->prev.Set(&*it); delta->prev.Set(&*it);
} }
if (config_.enable_edges_metadata) {
auto acc = mem_storage->edges_metadata_.access();
auto [_, inserted] = acc.insert(EdgeMetadata(gid, from->vertex_));
MG_ASSERT(inserted, "The edge must be inserted here!");
}
} }
utils::AtomicMemoryBlock atomic_memory_block{ utils::AtomicMemoryBlock atomic_memory_block{
[this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() { [this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() {
@ -443,6 +448,11 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces
if (delta) { if (delta) {
delta->prev.Set(&*it); delta->prev.Set(&*it);
} }
if (config_.enable_edges_metadata) {
auto acc = mem_storage->edges_metadata_.access();
auto [_, inserted] = acc.insert(EdgeMetadata(gid, from->vertex_));
MG_ASSERT(inserted, "The edge must be inserted here!");
}
} }
utils::AtomicMemoryBlock atomic_memory_block{ utils::AtomicMemoryBlock atomic_memory_block{
[this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() { [this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() {
@ -464,6 +474,15 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_); return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
} }
void InMemoryStorage::UpdateEdgesMetadataOnModification(Edge *edge, Vertex *from_vertex) {
auto edge_metadata_acc = edges_metadata_.access();
auto edge_to_modify = edge_metadata_acc.find(edge->gid);
if (edge_to_modify == edge_metadata_acc.end()) {
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
}
edge_to_modify->from_vertex = from_vertex;
}
Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) { Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) {
MG_ASSERT(edge->transaction_ == new_from->transaction_, MG_ASSERT(edge->transaction_ == new_from->transaction_,
"EdgeAccessor must be from the same transaction as the new from vertex " "EdgeAccessor must be from the same transaction as the new from vertex "
@ -564,6 +583,10 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor
mem_edge_type_index->UpdateOnEdgeModification(old_from_vertex, to_vertex, new_from_vertex, to_vertex, edge_ref, mem_edge_type_index->UpdateOnEdgeModification(old_from_vertex, to_vertex, new_from_vertex, to_vertex, edge_ref,
edge_type, transaction_); edge_type, transaction_);
if (config_.enable_edges_metadata) {
in_memory->UpdateEdgesMetadataOnModification(edge_ref.ptr, new_from_vertex);
}
transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT); transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT); transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
@ -987,8 +1010,10 @@ void InMemoryStorage::InMemoryAccessor::FastDiscardOfDeltas(uint64_t oldest_acti
if (!current_deleted_edges.empty()) { if (!current_deleted_edges.empty()) {
// 3.c) remove from edge skip_list // 3.c) remove from edge skip_list
auto edge_acc = mem_storage->edges_.access(); auto edge_acc = mem_storage->edges_.access();
auto edge_metadata_acc = mem_storage->edges_metadata_.access();
for (auto gid : current_deleted_edges) { for (auto gid : current_deleted_edges) {
edge_acc.remove(gid); edge_acc.remove(gid);
edge_metadata_acc.remove(gid);
} }
} }
} }
@ -1234,8 +1259,10 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
// EDGES // EDGES
{ {
auto edges_acc = mem_storage->edges_.access(); auto edges_acc = mem_storage->edges_.access();
auto edges_metadata_acc = mem_storage->edges_metadata_.access();
for (auto gid : my_deleted_edges) { for (auto gid : my_deleted_edges) {
edges_acc.remove(gid); edges_acc.remove(gid);
edges_metadata_acc.remove(gid);
} }
} }
} }
@ -1438,6 +1465,62 @@ EdgesIterable InMemoryStorage::InMemoryAccessor::Edges(EdgeTypeId edge_type, Vie
return EdgesIterable(mem_edge_type_index->Edges(edge_type, view, storage_, &transaction_)); return EdgesIterable(mem_edge_type_index->Edges(edge_type, view, storage_, &transaction_));
} }
std::optional<EdgeAccessor> InMemoryStorage::InMemoryAccessor::FindEdge(Gid gid, View view) {
using EdgeInfo = std::optional<std::tuple<EdgeRef, EdgeTypeId, Vertex *, Vertex *>>;
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
auto edge_acc = mem_storage->edges_.access();
auto edge_it = edge_acc.find(gid);
if (edge_it == edge_acc.end()) {
return std::nullopt;
}
auto *edge_ptr = &(*edge_it);
auto vertices_acc = mem_storage->vertices_.access();
auto extract_edge_info = [&](Vertex *from_vertex) -> EdgeInfo {
for (auto &out_edge : from_vertex->out_edges) {
if (std::get<2>(out_edge).ptr == edge_ptr) {
return std::tuple(std::get<2>(out_edge), std::get<0>(out_edge), from_vertex, std::get<1>(out_edge));
}
}
return std::nullopt;
};
auto edge_accessor_from_info = [this, view](EdgeInfo &maybe_edge_info) -> std::optional<EdgeAccessor> {
if (!maybe_edge_info) {
return std::nullopt;
}
auto &edge_info = *maybe_edge_info;
return EdgeAccessor::Create(std::get<0>(edge_info), std::get<1>(edge_info), std::get<2>(edge_info),
std::get<3>(edge_info), storage_, &transaction_, view);
};
if (mem_storage->config_.salient.items.enable_edges_metadata) {
auto edge_metadata_acc = mem_storage->edges_metadata_.access();
auto edge_metadata_it = edge_metadata_acc.find(gid);
MG_ASSERT(edge_metadata_it != edge_metadata_acc.end(), "Invalid database state!");
auto maybe_edge_info = extract_edge_info(edge_metadata_it->from_vertex);
return edge_accessor_from_info(maybe_edge_info);
}
// If metadata on edges is not enabled we will have to do
// a full scan.
auto maybe_edge_info = std::invoke([&]() -> EdgeInfo {
for (auto &from_vertex : vertices_acc) {
auto maybe_edge_info = extract_edge_info(&from_vertex);
if (maybe_edge_info) {
return maybe_edge_info;
}
}
return std::nullopt;
});
return edge_accessor_from_info(maybe_edge_info);
}
Transaction InMemoryStorage::CreateTransaction( Transaction InMemoryStorage::CreateTransaction(
IsolationLevel isolation_level, StorageMode storage_mode, IsolationLevel isolation_level, StorageMode storage_mode,
memgraph::replication_coordination_glue::ReplicationRole replication_role) { memgraph::replication_coordination_glue::ReplicationRole replication_role) {
@ -1786,8 +1869,12 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
} }
{ {
auto edge_acc = edges_.access(); auto edge_acc = edges_.access();
auto edge_metadata_acc = edges_metadata_.access();
for (auto edge : current_deleted_edges) { for (auto edge : current_deleted_edges) {
MG_ASSERT(edge_acc.remove(edge), "Invalid database state!"); MG_ASSERT(edge_acc.remove(edge), "Invalid database state!");
if (config_.salient.items.enable_edges_metadata) {
MG_ASSERT(edge_metadata_acc.remove(edge), "Invalid database state!");
}
} }
} }
@ -1809,10 +1896,12 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
// EXPENSIVE full scan, is only run if an IN_MEMORY_ANALYTICAL transaction involved any deletions // EXPENSIVE full scan, is only run if an IN_MEMORY_ANALYTICAL transaction involved any deletions
if (need_full_scan_edges) { if (need_full_scan_edges) {
auto edge_acc = edges_.access(); auto edge_acc = edges_.access();
auto edge_metadata_acc = edges_metadata_.access();
for (auto &edge : edge_acc) { for (auto &edge : edge_acc) {
// a deleted edge which as no deltas must have come from IN_MEMORY_ANALYTICAL deletion // a deleted edge which as no deltas must have come from IN_MEMORY_ANALYTICAL deletion
if (edge.delta == nullptr && edge.deleted) { if (edge.delta == nullptr && edge.deleted) {
edge_acc.remove(edge); edge_acc.remove(edge);
edge_metadata_acc.remove(edge.gid);
} }
} }
} }

View File

@ -109,6 +109,8 @@ class InMemoryStorage final : public Storage {
const std::optional<utils::Bound<PropertyValue>> &lower_bound, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) override; const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) override;
std::optional<EdgeAccessor> FindEdge(Gid gid, View view) override;
EdgesIterable Edges(EdgeTypeId edge_type, View view) override; EdgesIterable Edges(EdgeTypeId edge_type, View view) override;
/// Return approximate number of all vertices in the database. /// Return approximate number of all vertices in the database.
@ -424,9 +426,12 @@ class InMemoryStorage final : public Storage {
void PrepareForNewEpoch() override; void PrepareForNewEpoch() override;
void UpdateEdgesMetadataOnModification(Edge *edge, Vertex *from_vertex);
// Main object storage // Main object storage
utils::SkipList<storage::Vertex> vertices_; utils::SkipList<storage::Vertex> vertices_;
utils::SkipList<storage::Edge> edges_; utils::SkipList<storage::Edge> edges_;
utils::SkipList<storage::EdgeMetadata> edges_metadata_;
// Durability // Durability
durability::Recovery recovery_; durability::Recovery recovery_;

View File

@ -179,6 +179,8 @@ class Storage {
const std::optional<utils::Bound<PropertyValue>> &lower_bound, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) = 0; const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) = 0;
virtual std::optional<EdgeAccessor> FindEdge(Gid gid, View view) = 0;
virtual EdgesIterable Edges(EdgeTypeId edge_type, View view) = 0; virtual EdgesIterable Edges(EdgeTypeId edge_type, View view) = 0;
virtual Result<std::optional<VertexAccessor>> DeleteVertex(VertexAccessor *vertex); virtual Result<std::optional<VertexAccessor>> DeleteVertex(VertexAccessor *vertex);

View File

@ -27,6 +27,7 @@
M(ScanAllByLabelPropertyOperator, Operator, "Number of times ScanAllByLabelProperty operator was used.") \ M(ScanAllByLabelPropertyOperator, Operator, "Number of times ScanAllByLabelProperty operator was used.") \
M(ScanAllByIdOperator, Operator, "Number of times ScanAllById operator was used.") \ M(ScanAllByIdOperator, Operator, "Number of times ScanAllById operator was used.") \
M(ScanAllByEdgeTypeOperator, Operator, "Number of times ScanAllByEdgeTypeOperator operator was used.") \ M(ScanAllByEdgeTypeOperator, Operator, "Number of times ScanAllByEdgeTypeOperator operator was used.") \
M(ScanAllByEdgeIdOperator, Operator, "Number of times ScanAllByEdgeIdOperator operator was used.") \
M(ExpandOperator, Operator, "Number of times Expand operator was used.") \ M(ExpandOperator, Operator, "Number of times Expand operator was used.") \
M(ExpandVariableOperator, Operator, "Number of times ExpandVariable operator was used.") \ M(ExpandVariableOperator, Operator, "Number of times ExpandVariable operator was used.") \
M(ConstructNamedPathOperator, Operator, "Number of times ConstructNamedPath operator was used.") \ M(ConstructNamedPathOperator, Operator, "Number of times ConstructNamedPath operator was used.") \

View File

@ -124,6 +124,11 @@ startup_config_dict = {
"false", "false",
"Controls whether metadata should be collected about the resident labels and edge types.", "Controls whether metadata should be collected about the resident labels and edge types.",
), ),
"storage_enable_edges_metadata": (
"false",
"false",
"Controls whether additional metadata should be stored about the edges in order to do faster traversals on certain queries.",
),
"password_encryption_algorithm": ("bcrypt", "bcrypt", "The password encryption algorithm used for authentication."), "password_encryption_algorithm": ("bcrypt", "bcrypt", "The password encryption algorithm used for authentication."),
"pulsar_service_url": ("", "", "Default URL used while connecting to Pulsar brokers."), "pulsar_service_url": ("", "", "Default URL used while connecting to Pulsar brokers."),
"query_execution_timeout_sec": ( "query_execution_timeout_sec": (

View File

@ -1704,6 +1704,13 @@ TYPED_TEST(TestPlanner, ScanAllById) {
CheckPlan<TypeParam>(query, this->storage, ExpectScanAllById(), ExpectProduce()); CheckPlan<TypeParam>(query, this->storage, ExpectScanAllById(), ExpectProduce());
} }
TYPED_TEST(TestPlanner, ScanAllByEdgeId) {
// Test MATCH ()-[r]->() WHERE id(r) = 42 RETURN r
auto *query = QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("anon1"), EDGE("r"), NODE("anon2"))),
WHERE(EQ(FN("id", IDENT("r")), LITERAL(42))), RETURN("r")));
CheckPlan<TypeParam>(query, this->storage, ExpectScanAllByEdgeId(), ExpectProduce());
}
TYPED_TEST(TestPlanner, BfsToExisting) { TYPED_TEST(TestPlanner, BfsToExisting) {
// Test MATCH (n)-[r *bfs]-(m) WHERE id(m) = 42 RETURN r // Test MATCH (n)-[r *bfs]-(m) WHERE id(m) = 42 RETURN r
auto *bfs = this->storage.template Create<memgraph::query::EdgeAtom>( auto *bfs = this->storage.template Create<memgraph::query::EdgeAtom>(

View File

@ -66,6 +66,7 @@ class PlanChecker : public virtual HierarchicalLogicalOperatorVisitor {
PRE_VISIT(ScanAllByLabelPropertyRange); PRE_VISIT(ScanAllByLabelPropertyRange);
PRE_VISIT(ScanAllByLabelProperty); PRE_VISIT(ScanAllByLabelProperty);
PRE_VISIT(ScanAllByEdgeType); PRE_VISIT(ScanAllByEdgeType);
PRE_VISIT(ScanAllByEdgeId);
PRE_VISIT(ScanAllById); PRE_VISIT(ScanAllById);
PRE_VISIT(Expand); PRE_VISIT(Expand);
PRE_VISIT(ExpandVariable); PRE_VISIT(ExpandVariable);
@ -172,6 +173,7 @@ using ExpectDelete = OpChecker<Delete>;
using ExpectScanAll = OpChecker<ScanAll>; using ExpectScanAll = OpChecker<ScanAll>;
using ExpectScanAllByLabel = OpChecker<ScanAllByLabel>; using ExpectScanAllByLabel = OpChecker<ScanAllByLabel>;
using ExpectScanAllByEdgeType = OpChecker<ScanAllByEdgeType>; using ExpectScanAllByEdgeType = OpChecker<ScanAllByEdgeType>;
using ExpectScanAllByEdgeId = OpChecker<ScanAllByEdgeId>;
using ExpectScanAllById = OpChecker<ScanAllById>; using ExpectScanAllById = OpChecker<ScanAllById>;
using ExpectExpand = OpChecker<Expand>; using ExpectExpand = OpChecker<Expand>;
using ExpectConstructNamedPath = OpChecker<ConstructNamedPath>; using ExpectConstructNamedPath = OpChecker<ConstructNamedPath>;

View File

@ -2965,6 +2965,7 @@ TEST_P(DurabilityTest, ConstraintsRecoveryFunctionSetting) {
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)}; memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
memgraph::utils::SkipList<memgraph::storage::Vertex> vertices; memgraph::utils::SkipList<memgraph::storage::Vertex> vertices;
memgraph::utils::SkipList<memgraph::storage::Edge> edges; memgraph::utils::SkipList<memgraph::storage::Edge> edges;
memgraph::utils::SkipList<memgraph::storage::EdgeMetadata> edges_metadata;
std::unique_ptr<memgraph::storage::NameIdMapper> name_id_mapper = std::make_unique<memgraph::storage::NameIdMapper>(); std::unique_ptr<memgraph::storage::NameIdMapper> name_id_mapper = std::make_unique<memgraph::storage::NameIdMapper>();
std::atomic<uint64_t> edge_count{0}; std::atomic<uint64_t> edge_count{0};
uint64_t wal_seq_num{0}; uint64_t wal_seq_num{0};
@ -2978,7 +2979,7 @@ TEST_P(DurabilityTest, ConstraintsRecoveryFunctionSetting) {
config.durability.storage_directory / memgraph::storage::durability::kWalDirectory}; config.durability.storage_directory / memgraph::storage::durability::kWalDirectory};
// Recover snapshot. // Recover snapshot.
const auto info = recovery.RecoverData(&uuid, repl_storage_state, &vertices, &edges, &edge_count, const auto info = recovery.RecoverData(&uuid, repl_storage_state, &vertices, &edges, &edges_metadata, &edge_count,
name_id_mapper.get(), &indices, &constraints, config, &wal_seq_num); name_id_mapper.get(), &indices, &constraints, config, &wal_seq_num);
MG_ASSERT(info.has_value(), "Info doesn't have value present"); MG_ASSERT(info.has_value(), "Info doesn't have value present");
@ -3044,3 +3045,61 @@ TEST_P(DurabilityTest, EdgeTypeIndexRecovered) {
ASSERT_FALSE(acc->Commit().HasError()); ASSERT_FALSE(acc->Commit().HasError());
} }
} }
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_P(DurabilityTest, EdgeMetadataRecovered) {
if (GetParam() == false) {
return;
}
// Create snapshot.
{
memgraph::storage::Config config{.salient.items = {.properties_on_edges = GetParam()},
.durability = {.storage_directory = storage_directory, .snapshot_on_exit = true}};
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
memgraph::dbms::Database db{config, repl_state};
CreateBaseDataset(db.storage(), GetParam());
VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam());
}
ASSERT_EQ(GetSnapshotsList().size(), 1);
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
ASSERT_EQ(GetWalsList().size(), 0);
ASSERT_EQ(GetBackupWalsList().size(), 0);
// Recover snapshot.
memgraph::storage::Config config{.salient.items = {.properties_on_edges = GetParam(), .enable_edges_metadata = true},
.durability = {.storage_directory = storage_directory, .recover_on_startup = true}};
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
memgraph::dbms::Database db{config, repl_state};
VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam());
// Check if data has been loaded correctly.
{
auto acc = db.Access();
for (auto i{0U}; i < kNumBaseEdges; ++i) {
auto edge = acc->FindEdge(memgraph::storage::Gid::FromUint(i), memgraph::storage::View::OLD);
ASSERT_TRUE(edge.has_value());
}
auto edge = acc->FindEdge(memgraph::storage::Gid::FromUint(kNumBaseEdges), memgraph::storage::View::OLD);
ASSERT_FALSE(edge.has_value());
auto vertex = acc->CreateVertex();
auto new_edge = acc->CreateEdge(&vertex, &vertex, db.storage()->NameToEdgeType("et"));
ASSERT_TRUE(new_edge.HasValue());
ASSERT_FALSE(acc->Commit().HasError());
}
{
auto acc = db.Access();
auto edge = acc->FindEdge(memgraph::storage::Gid::FromUint(kNumBaseEdges), memgraph::storage::View::OLD);
ASSERT_TRUE(edge.has_value());
edge = acc->FindEdge(memgraph::storage::Gid::FromUint(kNumBaseEdges + 1), memgraph::storage::View::OLD);
ASSERT_FALSE(edge.has_value());
ASSERT_FALSE(acc->Commit().HasError());
}
}